diff --git a/history.txt b/history.txt index 5c2ecea36..e2884636b 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,9 @@ +2009-04-17 sponge + * Catch NPE in NTCP. + This possibly augments fix 2009-04-11 welterde below. + * Various LINT on NTCP sources, and removal of space-wasting + spaces at end of lines in sources touched. + 2009-04-13 Mathiasdm * Bugfix on tray icon updating * Some more work on the general configuration menu diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 3b16d6ad5..1fe43ff30 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 18; + public final static long BUILD = 19; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 1dffde799..48864d3ef 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -1,9 +1,9 @@ package net.i2p.router.transport; /* * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat * your children, but it might. Use at your own risk. * */ @@ -14,7 +14,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,10 +45,10 @@ public abstract class TransportImpl implements Transport { private Log _log; private TransportEventListener _listener; private RouterAddress _currentAddress; - private List _sendPool; + private final List _sendPool; protected RouterContext _context; /** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */ - private Map _unreachableEntries; + private final Map _unreachableEntries; private Set _wasUnreachableEntries; /** global router ident -> IP */ private static Map _IPMap = new ConcurrentHashMap(128); @@ -61,7 +60,7 @@ public abstract class TransportImpl implements Transport { public TransportImpl(RouterContext context) { _context = context; _log = _context.logManager().getLog(TransportImpl.class); - + _context.statManager().createRateStat("transport.sendMessageFailureLifetime", "How long the lifetime of messages that fail are?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("transport.sendMessageSize", "How large are the messages sent?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("transport.receiveMessageSize", "How large are the messages received?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); @@ -74,7 +73,7 @@ public abstract class TransportImpl implements Transport { _wasUnreachableEntries = new ConcurrentHashSet(16); _currentAddress = null; } - + /** * How many peers can we talk to right now? * @@ -111,18 +110,18 @@ public abstract class TransportImpl implements Transport { * Can we initiate or accept a connection to another peer, saving some margin */ public boolean haveCapacity() { return true; } - + /** * Return our peer clock skews on a transport. * Vector composed of Long, each element representing a peer skew in seconds. * Dummy version. Transports override it. */ public Vector getClockSkews() { return new Vector(); } - + public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; } /** * Nonblocking call to pull the next outbound message - * off the queue. + * off the queue. * * @return the next message or null if none are available */ @@ -135,7 +134,7 @@ public abstract class TransportImpl implements Transport { msg.beginSend(); return msg; } - + /** * The transport is done sending this message * @@ -167,7 +166,7 @@ public abstract class TransportImpl implements Transport { } /** * The transport is done sending this message. This is the method that actually - * does all of the cleanup - firing off jobs, requeueing, updating stats, etc. + * does all of the cleanup - firing off jobs, requeueing, updating stats, etc. * * @param msg message in question * @param sendSuccessful true if the peer received it @@ -180,64 +179,64 @@ public abstract class TransportImpl implements Transport { msg.timestamp("afterSend(successful)"); else msg.timestamp("afterSend(failed)"); - + if (!sendSuccessful) msg.transportFailed(getStyle()); if (msToSend > 1000) { if (_log.shouldLog(Log.WARN)) - _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " - + msg.getMessageType() + " " + msg.getMessageId() + " to " - + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend + _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " + + msg.getMessageType() + " " + msg.getMessageId() + " to " + + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend + "/" + msg.getTransmissionTime()); } - //if (true) + //if (true) // _log.error("(not error) I2NP message sent? " + sendSuccessful + " " + msg.getMessageId() + " after " + msToSend + "/" + msg.getTransmissionTime()); - + long lifetime = msg.getLifetime(); if (lifetime > 3000) { int level = Log.WARN; if (!sendSuccessful) level = Log.INFO; if (_log.shouldLog(level)) - _log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " - + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + _log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString()); } else { if (_log.shouldLog(Log.INFO)) - _log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " - + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + _log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString()); } if (sendSuccessful) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send message " + msg.getMessageType() + " to " - + msg.getTarget().getIdentity().getHash().toBase64() + " with transport " + _log.debug("Send message " + msg.getMessageType() + " to " + + msg.getTarget().getIdentity().getHash().toBase64() + " with transport " + getStyle() + " successfully"); Job j = msg.getOnSendJob(); - if (j != null) + if (j != null) _context.jobQueue().addJob(j); log = true; msg.discardData(); } else { if (_log.shouldLog(Log.INFO)) - _log.info("Failed to send message " + msg.getMessageType() - + " to " + msg.getTarget().getIdentity().getHash().toBase64() + _log.info("Failed to send message " + msg.getMessageType() + + " to " + msg.getTarget().getIdentity().getHash().toBase64() + " with transport " + getStyle() + " (details: " + msg + ")"); if (msg.getExpiration() < _context.clock().now()) _context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime); - + if (allowRequeue) { - if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) + if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) && (msg.getMessage() != null) ) { // this may not be the last transport available - keep going _context.outNetMessagePool().add(msg); // don't discard the data yet! } else { if (_log.shouldLog(Log.INFO)) - _log.info("No more time left (" + new Date(msg.getExpiration()) - + ", expiring without sending successfully the " + _log.info("No more time left (" + new Date(msg.getExpiration()) + + ", expiring without sending successfully the " + msg.getMessageType()); if (msg.getOnFailedSendJob() != null) _context.jobQueue().addJob(msg.getOnFailedSendJob()); @@ -251,8 +250,8 @@ public abstract class TransportImpl implements Transport { } else { MessageSelector selector = msg.getReplySelector(); if (_log.shouldLog(Log.INFO)) - _log.info("Failed and no requeue allowed for a " - + msg.getMessageSize() + " byte " + _log.info("Failed and no requeue allowed for a " + + msg.getMessageSize() + " byte " + msg.getMessageType() + " message with selector " + selector, new Exception("fail cause")); if (msg.getOnFailedSendJob() != null) _context.jobQueue().addJob(msg.getOnFailedSendJob()); @@ -269,9 +268,9 @@ public abstract class TransportImpl implements Transport { String type = msg.getMessageType(); // the udp transport logs some further details /* - _context.messageHistory().sendMessage(type, msg.getMessageId(), + _context.messageHistory().sendMessage(type, msg.getMessageId(), msg.getExpiration(), - msg.getTarget().getIdentity().getHash(), + msg.getTarget().getIdentity().getHash(), sendSuccessful); */ } @@ -281,23 +280,23 @@ public abstract class TransportImpl implements Transport { long allTime = now - msg.getCreated(); if (allTime > 5*1000) { if (_log.shouldLog(Log.INFO)) - _log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful - + "): " + allTime + "ms/" + sendTime + "ms after failing on: " + _log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful + + "): " + allTime + "ms/" + sendTime + "ms after failing on: " + msg.getFailedTransports() + " and succeeding on " + getStyle()); if ( (allTime > 60*1000) && (sendSuccessful) ) { // WTF!!@# if (_log.shouldLog(Log.WARN)) - _log.warn("WTF, more than a minute slow? " + msg.getMessageType() - + " of id " + msg.getMessageId() + " (send begin on " - + new Date(msg.getSendBegin()) + " / created on " + _log.warn("WTF, more than a minute slow? " + msg.getMessageType() + + " of id " + msg.getMessageId() + " (send begin on " + + new Date(msg.getSendBegin()) + " / created on " + new Date(msg.getCreated()) + "): " + msg, msg.getCreatedBy()); - _context.messageHistory().messageProcessingError(msg.getMessageId(), - msg.getMessageType(), + _context.messageHistory().messageProcessingError(msg.getMessageId(), + msg.getMessageType(), "Took too long to send [" + allTime + "ms]"); } } - + if (sendSuccessful) { _context.statManager().addRateData("transport.sendProcessingTime", lifetime, lifetime); _context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), getStyle(), sendTime, msg.getMessageSize()); @@ -307,7 +306,7 @@ public abstract class TransportImpl implements Transport { _context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime, lifetime); } } - + /** * Asynchronously send the message as requested in the message and, if the * send is successful, queue up any msg.getOnSendJob job, and register it @@ -323,14 +322,14 @@ public abstract class TransportImpl implements Transport { } boolean duplicate = false; synchronized (_sendPool) { - if (_sendPool.contains(msg)) + if (_sendPool.contains(msg)) duplicate = true; else _sendPool.add(msg); } if (duplicate) { if (_log.shouldLog(Log.ERROR)) - _log.error("Message already is in the queue? wtf. msg = " + msg, + _log.error("Message already is in the queue? wtf. msg = " + msg, new Exception("wtf, requeued?")); } @@ -346,15 +345,15 @@ public abstract class TransportImpl implements Transport { * and it should not block */ protected abstract void outboundMessageReady(); - + /** * Message received from the I2NPMessageReader - send it to the listener * */ public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) { - //if (true) + //if (true) // _log.error("(not error) I2NP message received: " + inMsg.getUniqueId() + " after " + msToReceive); - + int level = Log.INFO; if (msToReceive > 5000) level = Log.WARN; @@ -385,7 +384,7 @@ public abstract class TransportImpl implements Transport { _context.profileManager().messageReceived(remoteIdentHash, getStyle(), msToReceive, bytesReceived); _context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive); } - + _context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive); if (msToReceive > 1000) { _context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive); @@ -394,7 +393,7 @@ public abstract class TransportImpl implements Transport { //// this functionality is built into the InNetMessagePool //String type = inMsg.getClass().getName(); //MessageHistory.getInstance().receiveMessage(type, inMsg.getUniqueId(), inMsg.getMessageExpiration(), remoteIdentHash, true); - + if (_listener != null) { _listener.messageReceived(inMsg, remoteIdent, remoteIdentHash); } else { @@ -402,9 +401,9 @@ public abstract class TransportImpl implements Transport { _log.error("WTF! Null listener! this = " + toString(), new Exception("Null listener")); } } - + /** What addresses are we currently listening to? */ - public RouterAddress getCurrentAddress() { + public RouterAddress getCurrentAddress() { return _currentAddress; } /** @@ -417,19 +416,19 @@ public abstract class TransportImpl implements Transport { if ("SSU".equals(getStyle())) _context.commSystem().notifyReplaceAddress(address); } - + /** Who to notify on message availability */ public void setListener(TransportEventListener listener) { _listener = listener; } /** Make this stuff pretty (only used in the old console) */ public void renderStatusHTML(Writer out) throws IOException {} public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException { renderStatusHTML(out); } - + public RouterContext getContext() { return _context; } public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; } public void recheckReachability() {} public boolean isBacklogged(Hash dest) { return false; } public boolean isEstablished(Hash dest) { return false; } - + private static final long UNREACHABLE_PERIOD = 5*60*1000; public boolean isUnreachable(Hash peer) { long now = _context.clock().now(); @@ -506,7 +505,7 @@ public abstract class TransportImpl implements Transport { _log.warn(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer); } - public static void setIP(Hash peer, byte[] ip) { + public /* static */ void setIP(Hash peer, byte[] ip) { _IPMap.put(peer, ip); } @@ -517,7 +516,7 @@ public abstract class TransportImpl implements Transport { public static boolean isPubliclyRoutable(byte addr[]) { if (addr.length == 4) { if ((addr[0]&0xFF) == 127) return false; - if ((addr[0]&0xFF) == 10) return false; + if ((addr[0]&0xFF) == 10) return false; if ( ((addr[0]&0xFF) == 172) && ((addr[1]&0xFF) >= 16) && ((addr[1]&0xFF) <= 31) ) return false; if ( ((addr[0]&0xFF) == 192) && ((addr[1]&0xFF) == 168) ) return false; if ((addr[0]&0xFF) >= 224) return false; // no multicast diff --git a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java index cae5a99d2..e462764d5 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java @@ -45,7 +45,7 @@ import net.i2p.util.Log; public class EstablishState { private RouterContext _context; private Log _log; - + // bob receives (and alice sends) private byte _X[]; private byte _hX_xor_bobIdentHash[]; @@ -60,28 +60,28 @@ public class EstablishState { private transient long _tsB; private transient long _tsA; private transient byte _e_bobSig[]; - + /** previously received encrypted block (or the IV) */ private byte _prevEncrypted[]; /** current encrypted block we are reading */ private byte _curEncrypted[]; /** - * next index in _curEncrypted to write to (equals _curEncrypted length if the block is + * next index in _curEncrypted to write to (equals _curEncrypted length if the block is * ready to decrypt) */ private int _curEncryptedOffset; /** decryption buffer */ private byte _curDecrypted[]; - + /** bytes received so far */ private int _received; /** bytes sent so far */ private int _sent; - + private byte _extra[]; - + private DHSessionKeyBuilder _dh; - + private NTCPTransport _transport; private NTCPConnection _con; private boolean _corrupt; @@ -92,7 +92,7 @@ public class EstablishState { private boolean _verified; private boolean _confirmWritten; private boolean _failedBySkew; - + public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) { _context = ctx; _log = ctx.logManager().getLog(getClass()); @@ -113,15 +113,15 @@ public class EstablishState { byte hx[] = ctx.sha().calculateHash(_X).getData(); DataHelper.xor(hx, 0, con.getRemotePeer().calculateHash().getData(), 0, _hX_xor_bobIdentHash, 0, hx.length); } - + _prevEncrypted = new byte[16]; _curEncrypted = new byte[16]; _curEncryptedOffset = 0; _curDecrypted = new byte[16]; - + _received = 0; } - + /** * parse the contents of the buffer as part of the handshake. if the * handshake is completed and there is more data remaining, the buffer is @@ -133,7 +133,7 @@ public class EstablishState { throw new IllegalStateException(prefix() + "received after completion [corrupt?" + _corrupt + " verified? " + _verified + "] on " + _con); if (!src.hasRemaining()) return; // nothing to receive - + if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"receive " + src); if (_con.isInbound()) @@ -141,15 +141,15 @@ public class EstablishState { else receiveOutbound(src); } - + /** * we have written all of the data required to confirm the connection * establishment */ public boolean confirmWritten() { return _confirmWritten; } - + public boolean getFailedBySkew() { return _failedBySkew; } - + /** we are Bob, so receive these bytes as part of an inbound connection */ private void receiveInbound(ByteBuffer src) { if (_log.shouldLog(Log.DEBUG)) @@ -178,7 +178,7 @@ public class EstablishState { if (_dh.getSessionKey() == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"Enough data for a DH received"); - + // first verify that Alice knows who she is trying to talk with and that the X // isn't corrupt Hash hX = _context.sha().calculateHash(_X); @@ -201,7 +201,7 @@ public class EstablishState { System.arraycopy(realXor, 16, _prevEncrypted, 0, _prevEncrypted.length); if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")"); - + // now prepare our response: Y+E(H(X+Y)+tsB+padding, sk, Y[239:255]) _Y = _dh.getMyPublicValueBytes(); byte xy[] = new byte[_X.length+_Y.length]; @@ -233,7 +233,7 @@ public class EstablishState { byte write[] = new byte[_Y.length + _e_hXY_tsB.length]; System.arraycopy(_Y, 0, write, 0, _Y.length); System.arraycopy(_e_hXY_tsB, 0, write, _Y.length, _e_hXY_tsB.length); - + // ok, now that is prepared, we want to actually send it, so make sure we are up for writing _transport.getPumper().wantsWrite(_con, write); if (!src.hasRemaining()) return; @@ -243,7 +243,7 @@ public class EstablishState { return; } } - + // ok, we are onto the encrypted area while (src.hasRemaining() && !_corrupt) { if (_log.shouldLog(Log.DEBUG)) @@ -256,12 +256,12 @@ public class EstablishState { _context.aes().decrypt(_curEncrypted, 0, _curDecrypted, 0, _dh.getSessionKey(), _prevEncrypted, 0, _curEncrypted.length); //if (_log.shouldLog(Log.DEBUG)) // _log.debug(prefix()+"full block read and decrypted: " + Base64.encode(_curDecrypted)); - + byte swap[] = new byte[16]; _prevEncrypted = _curEncrypted; _curEncrypted = swap; _curEncryptedOffset = 0; - + if (_aliceIdentSize <= 0) { // we are on the first decrypted block _aliceIdentSize = (int)DataHelper.fromLong(_curDecrypted, 0, 2); _sz_aliceIdent_tsA_padding_aliceSigSize = 2 + _aliceIdentSize + 4 + Signature.SIGNATURE_BYTES; @@ -292,8 +292,8 @@ public class EstablishState { if (!_corrupt && _verified && src.hasRemaining()) prepareExtra(src); if (_log.shouldLog(Log.DEBUG)) - _log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size() - + " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize + _log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size() + + " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize + " corrupt=" + _corrupt + " verified=" + _verified + " extra=" + (_extra != null ? _extra.length : 0) + ")"); return; @@ -310,11 +310,11 @@ public class EstablishState { _log.debug(prefix()+"done with the data, not yet complete or corrupt"); } } - + /** we are Alice, so receive these bytes as part of an outbound connection */ private void receiveOutbound(ByteBuffer src) { if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"Receive outbound " + src + " received=" + _received); - + // recv Y+E(H(X+Y)+tsB, sk, Y[239:255]) while (_received < _Y.length && src.hasRemaining()) { byte c = src.get(); @@ -361,7 +361,7 @@ public class EstablishState { _tsA = _context.clock().now()/1000; // our (Alice's) timestamp in seconds if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB)); - + // the skew is not authenticated yet, but it is certainly fatal to // the establishment, so fail hard if appropriate long diff = 1000*Math.abs(_tsA-_tsB); @@ -374,7 +374,7 @@ public class EstablishState { } else if (_log.shouldLog(Log.DEBUG)) { _log.debug(prefix()+"Clock skew: " + diff + " ms"); } - + // now prepare and send our response // send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31]) int sigSize = _X.length+_Y.length+Hash.HASH_LENGTH+4+4;//+12; @@ -390,11 +390,11 @@ public class EstablishState { //_context.random().nextBytes(sigPad); //System.arraycopy(sigPad, 0, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, padSig); Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey()); - + //if (_log.shouldLog(Log.DEBUG)) { // _log.debug(prefix()+"signing " + Base64.encode(preSign)); //} - + byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray(); int min = 2+ident.length+4+Signature.SIGNATURE_BYTES; int rem = min % 16; @@ -409,10 +409,10 @@ public class EstablishState { _context.random().nextBytes(pad); System.arraycopy(pad, 0, preEncrypt, 2+ident.length+4, padding); System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, Signature.SIGNATURE_BYTES); - + _prevEncrypted = new byte[preEncrypt.length]; _context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(), _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-16, preEncrypt.length); - + if (_log.shouldLog(Log.DEBUG)) { //_log.debug(prefix() + "unencrypted response to Bob: " + Base64.encode(preEncrypt)); //_log.debug(prefix() + "encrypted response to Bob: " + Base64.encode(_prevEncrypted)); @@ -423,7 +423,7 @@ public class EstablishState { } if (_received >= _Y.length + _e_hXY_tsB.length && src.hasRemaining()) { // we are receiving their confirmation - + // recv E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev) int off = 0; if (_e_bobSig == null) { @@ -439,7 +439,7 @@ public class EstablishState { if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"recv bobSig received=" + _received); _e_bobSig[off++] = src.get(); _received++; - + if (off >= _e_bobSig.length) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug(prefix() + "received E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev): " + Base64.encode(_e_bobSig)); @@ -449,7 +449,7 @@ public class EstablishState { byte bobSigData[] = new byte[Signature.SIGNATURE_BYTES]; System.arraycopy(bobSig, 0, bobSigData, 0, Signature.SIGNATURE_BYTES); Signature sig = new Signature(bobSigData); - + byte toVerify[] = new byte[_X.length+_Y.length+Hash.HASH_LENGTH+4+4]; int voff = 0; System.arraycopy(_X, 0, toVerify, voff, _X.length); voff += _X.length; @@ -457,7 +457,7 @@ public class EstablishState { System.arraycopy(_context.routerHash().getData(), 0, toVerify, voff, Hash.HASH_LENGTH); voff += Hash.HASH_LENGTH; DataHelper.toLong(toVerify, voff, 4, _tsA); voff += 4; DataHelper.toLong(toVerify, voff, 4, _tsB); voff += 4; - + _verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey()); if (!_verified) { _context.statManager().addRateData("ntcp.invalidSignature", 1, 0); @@ -481,12 +481,12 @@ public class EstablishState { } } } - + /** did the handshake fail for some reason? */ public boolean isCorrupt() { return _err != null; } /** @return is the handshake complete and valid? */ public boolean isComplete() { return _verified; } - + /** * we are establishing an outbound connection, so prepare ourselves by * queueing up the write of the first part of the handshake @@ -504,10 +504,10 @@ public class EstablishState { _log.debug(prefix()+"prepare outbound with received=" + _received); } } - + /** * make sure the signatures are correct, and if they are, update the - * NIOConnection with the session key / peer ident / clock skew / iv. + * NIOConnection with the session key / peer ident / clock skew / iv. * The NIOConnection itself is responsible for registering with the * transport */ @@ -516,10 +516,10 @@ public class EstablishState { byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray(); //if (_log.shouldLog(Log.DEBUG)) // _log.debug(prefix()+"decrypted sz(etc) data: " + Base64.encode(b)); - + try { RouterIdentity alice = new RouterIdentity(); - int sz = (int)DataHelper.fromLong(b, 0, 2); + int sz = (int)DataHelper.fromLong(b, 0, 2); // TO-DO: Hey zzz... Throws an NPE for me... see below, for my "quick fix", need to find out the real reason if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) { _context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0); fail("size is invalid", new Exception("size is " + sz)); @@ -529,7 +529,7 @@ public class EstablishState { System.arraycopy(b, 2, aliceData, 0, sz); alice.fromByteArray(aliceData); long tsA = DataHelper.fromLong(b, 2+sz, 4); - + ByteArrayOutputStream baos = new ByteArrayOutputStream(768); baos.write(_X); baos.write(_Y); @@ -537,7 +537,7 @@ public class EstablishState { baos.write(DataHelper.toLong(4, tsA)); baos.write(DataHelper.toLong(4, _tsB)); //baos.write(b, 2+sz+4, b.length-2-sz-4-Signature.SIGNATURE_BYTES); - + byte toVerify[] = baos.toByteArray(); if (_log.shouldLog(Log.DEBUG)) { _log.debug(prefix()+"checking " + Base64.encode(toVerify, 0, 16)); @@ -566,7 +566,7 @@ public class EstablishState { _transport.setIP(alice.calculateHash(), ip); if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "verification successful for " + _con); - + long diff = 1000*Math.abs(tsA-_tsB); if (diff >= Router.CLOCK_FUDGE_FACTOR) { _context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0); @@ -597,9 +597,11 @@ public class EstablishState { } catch (DataFormatException dfe) { _context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0); fail("Error verifying peer", dfe); + } catch(NullPointerException npe) { + fail("Error verifying peer", npe); // TO-DO: zzz This is that quick-fix. -- Sponge } } - + private void sendInboundConfirm(RouterIdentity alice, long tsA) { // send Alice E(S(X+Y+Alice.identHash+tsA+tsB), sk, prev) byte toSign[] = new byte[256+256+32+4+4]; @@ -610,7 +612,7 @@ public class EstablishState { System.arraycopy(h.getData(), 0, toSign, off, 32); off += 32; DataHelper.toLong(toSign, off, 4, tsA); off += 4; DataHelper.toLong(toSign, off, 4, _tsB); off += 4; - + Signature sig = _context.dsa().sign(toSign, _context.keyManager().getSigningPrivateKey()); byte preSig[] = new byte[Signature.SIGNATURE_BYTES+8]; byte pad[] = new byte[8]; @@ -619,12 +621,12 @@ public class EstablishState { System.arraycopy(pad, 0, preSig, Signature.SIGNATURE_BYTES, pad.length); _e_bobSig = new byte[preSig.length]; _context.aes().encrypt(preSig, 0, _e_bobSig, 0, _dh.getSessionKey(), _e_hXY_tsB, _e_hXY_tsB.length-16, _e_bobSig.length); - + if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "Sending encrypted inbound confirmation"); _transport.getPumper().wantsWrite(_con, _e_bobSig); } - + /** anything left over in the byte buffer after verification is extra */ private void prepareExtra(ByteBuffer buf) { int remaining = buf.remaining(); @@ -636,13 +638,13 @@ public class EstablishState { if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")"); } - + /** * if complete, this will contain any bytes received as part of the * handshake that were after the actual handshake. This may return null. */ public byte[] getExtraBytes() { return _extra; } - + private void fail(String reason) { fail(reason, null); } private void fail(String reason, Exception e) { fail(reason, e, false); } private void fail(String reason, Exception e, boolean bySkew) { @@ -653,11 +655,12 @@ public class EstablishState { if (_log.shouldLog(Log.WARN)) _log.warn(prefix()+"Failed to establish: " + _err, e); } - + public String getError() { return _err; } public Exception getException() { return _e; } - + private String prefix() { return toString(); } + @Override public String toString() { StringBuffer buf = new StringBuffer(64); buf.append("est").append(System.identityHashCode(this)); @@ -669,7 +672,7 @@ public class EstablishState { buf.append(": "); return buf.toString(); } - + /** * a check info connection will receive 256 bytes containing: * - 32 bytes of uninterpreted, ignored data @@ -703,7 +706,7 @@ public class EstablishState { off += 4; long skewSeconds = (ctx.clock().now()/1000)-now; if (log.shouldLog(Log.INFO)) - log.info("Check info received: our IP: " + ourIP + " our port: " + port + log.info("Check info received: our IP: " + ourIP + " our port: " + port + " skew: " + skewSeconds + " s"); } catch (UnknownHostException uhe) { // ipSize is invalid @@ -717,7 +720,7 @@ public class EstablishState { return false; } } - + public static void checkHost(String args[]) { if (args.length != 3) { System.err.println("Usage: EstablishState ipOrHostname portNum peerHashBase64"); @@ -746,7 +749,7 @@ public class EstablishState { Hash h = ctx.sha().calculateHash(toSend, 32, toSend.length-32-32); DataHelper.xor(peer, 0, h.getData(), 0, toSend, toSend.length-32, peer.length); System.out.println("check hash: " + h.toBase64()); - + out.write(toSend); out.flush(); try { Thread.sleep(1000); } catch (InterruptedException ie) {} @@ -755,7 +758,7 @@ public class EstablishState { e.printStackTrace(); } } - + public static void main(String args[]) { if (args.length == 3) { checkHost(args); @@ -780,7 +783,7 @@ public class EstablishState { out.write(hx_xor_bih); out.flush(); // DONE SENDING X+(H(X) xor Bob.identHash)-----------------------------> - + // NOW READ Y+E(H(X+Y)+tsB+padding, sk, Y[239:255]) InputStream in = s.getInputStream(); byte toRead[] = new byte[256+(32+4+12)]; @@ -808,9 +811,9 @@ public class EstablishState { System.out.println("encrypted H(X+Y)+tsB+padding: " + Base64.encode(toRead, Y.length, toRead.length-Y.length)); System.out.println("unencrypted H(X+Y)+tsB+padding: " + Base64.encode(decrypted)); long tsB = DataHelper.fromLong(decrypted, 32, 4); - + //try { Thread.sleep(40*1000); } catch (InterruptedException ie) {} - + RouterIdentity alice = new RouterIdentity(); Object k[] = ctx.keyGenerator().generatePKIKeypair(); PublicKey pub = (PublicKey)k[0]; @@ -821,16 +824,16 @@ public class EstablishState { alice.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); alice.setPublicKey(pub); alice.setSigningPublicKey(spub); - + // SEND E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB+padding), sk, hX_xor_Bob.identHash[16:31])---> - + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); byte aliceb[] = alice.toByteArray(); long tsA = ctx.clock().now()/1000l; baos.write(DataHelper.toLong(2, aliceb.length)); baos.write(aliceb); baos.write(DataHelper.toLong(4, tsA)); - + int base = baos.size() + Signature.SIGNATURE_BYTES; int rem = base % 16; int padding = 0; @@ -840,7 +843,7 @@ public class EstablishState { ctx.random().nextBytes(pad); baos.write(pad); base += padding; - + ByteArrayOutputStream sbaos = new ByteArrayOutputStream(512); sbaos.write(X); sbaos.write(Y); @@ -850,21 +853,21 @@ public class EstablishState { //sbaos.write(pad); Signature sig = ctx.dsa().sign(sbaos.toByteArray(), spriv); baos.write(sig.toByteArray()); - + byte unencrypted[] = baos.toByteArray(); byte toWrite[] = new byte[unencrypted.length]; System.out.println("unencrypted.length = " + unencrypted.length + " alice.size = " + aliceb.length + " padding = " + padding + " base = " + base); ctx.aes().encrypt(unencrypted, 0, toWrite, 0, dh.getSessionKey(), hx_xor_bih, 16, unencrypted.length); - + out.write(toWrite); out.flush(); - + System.out.println("unencrypted: " + Base64.encode(unencrypted)); System.out.println("encrypted: " + Base64.encode(toWrite)); System.out.println("Local peer: " + alice.calculateHash().toBase64()); // now check bob's signature - + SigningPublicKey bobPubKey = null; try { RouterInfo info = new RouterInfo(); @@ -874,9 +877,9 @@ public class EstablishState { e.printStackTrace(); return; } - + System.out.println("Reading in bob's sig"); - + byte bobRead[] = new byte[48]; read = 0; while (read < bobRead.length) { @@ -892,7 +895,7 @@ public class EstablishState { byte bobSigData[] = new byte[Signature.SIGNATURE_BYTES]; System.arraycopy(preSig, 0, bobSigData, 0, Signature.SIGNATURE_BYTES); // ignore the padding System.out.println("Bob's sig: " + Base64.encode(bobSigData)); - + byte signed[] = new byte[256+256+32+4+4]; int off = 0; System.arraycopy(X, 0, signed, off, 256); off += 256; @@ -904,18 +907,18 @@ public class EstablishState { Signature bobSig = new Signature(bobSigData); boolean ok = ctx.dsa().verifySignature(bobSig, signed, bobPubKey); - + System.out.println("bob's sig matches? " + ok); - + try { Thread.sleep(5*1000); } catch (InterruptedException ie) {} byte fakeI2NPbuf[] = new byte[128]; ctx.random().nextBytes(fakeI2NPbuf); out.write(fakeI2NPbuf); out.flush(); - + try { Thread.sleep(30*1000); } catch (InterruptedException ie) {} s.close(); - } catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index a9208e9a6..8295ae3d7 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -36,7 +36,7 @@ public class NTCPTransport extends TransportImpl { private SharedBid _fastBid; private SharedBid _slowBid; private SharedBid _transientFail; - private Object _conLock; + private final Object _conLock; private Map _conByIdent; private NTCPAddress _myAddress; private EventPumper _pumper; @@ -46,14 +46,14 @@ public class NTCPTransport extends TransportImpl { * list of NTCPConnection of connections not yet established that we * want to remove on establishment or close on timeout */ - private List _establishing; + private final List _establishing; private List _sent; private NTCPSendFinisher _finisher; - + public NTCPTransport(RouterContext ctx) { super(ctx); - + _log = ctx.logManager().getLog(getClass()); _context.statManager().createRateStat("ntcp.sendTime", "Total message lifetime when sent completely", "ntcp", new long[] { 60*1000, 10*60*1000 }); @@ -122,19 +122,19 @@ public class NTCPTransport extends TransportImpl { _establishing = new ArrayList(4); _conLock = new Object(); _conByIdent = new HashMap(64); - + _sent = new ArrayList(4); _finisher = new NTCPSendFinisher(ctx, this); - + _pumper = new EventPumper(ctx, this); _reader = new Reader(ctx); _writer = new net.i2p.router.transport.ntcp.Writer(ctx); - + _fastBid = new SharedBid(25); // best _slowBid = new SharedBid(70); // better than ssu unestablished, but not better than ssu established _transientFail = new SharedBid(TransportBid.TRANSIENT_FAIL); } - + void inboundEstablished(NTCPConnection con) { _context.statManager().addRateData("ntcp.inboundEstablished", 1, 0); markReachable(con.getRemotePeer().calculateHash(), true); @@ -150,7 +150,7 @@ public class NTCPTransport extends TransportImpl { old.close(); } } - + protected void outboundMessageReady() { OutNetMessage msg = getNextMessage(); if (msg != null) { @@ -219,7 +219,7 @@ public class NTCPTransport extends TransportImpl { } con.enqueueInfoMessage(); // enqueues a netDb store of our own info con.send(msg); // doesn't do anything yet, just enqueues it - + try { SocketChannel channel = SocketChannel.open(); con.setChannel(channel); @@ -237,7 +237,8 @@ public class NTCPTransport extends TransportImpl { */ } } - + + @Override public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) { super.afterSend(msg, sendSuccessful, allowRequeue, msToSend); } @@ -259,7 +260,7 @@ public class NTCPTransport extends TransportImpl { _context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0); return null; } - + boolean established = isEstablished(toAddress.getIdentity()); if (established) { // should we check the queue size? nah, if its valid, use it if (_log.shouldLog(Log.DEBUG)) @@ -267,7 +268,7 @@ public class NTCPTransport extends TransportImpl { return _fastBid; } RouterAddress addr = toAddress.getTargetAddress(STYLE); - + if (addr == null) { markUnreachable(peer); _context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1, 0); @@ -294,25 +295,26 @@ public class NTCPTransport extends TransportImpl { return null; } } - + if (!allowConnection()) { if (_log.shouldLog(Log.WARN)) _log.warn("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + ", max connection limit reached"); return _transientFail; } - //if ( (_myAddress != null) && (_myAddress.equals(addr)) ) + //if ( (_myAddress != null) && (_myAddress.equals(addr)) ) // return null; // dont talk to yourself - + if (_log.shouldLog(Log.DEBUG)) _log.debug("slow bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64()); return _slowBid; } - + public boolean allowConnection() { return countActivePeers() < getMaxConnections(); } + @Override public boolean haveCapacity() { return countActivePeers() < getMaxConnections() * 4 / 5; } @@ -323,21 +325,23 @@ public class NTCPTransport extends TransportImpl { private boolean isEstablished(RouterIdentity peer) { return isEstablished(peer.calculateHash()); } - + + @Override public boolean isEstablished(Hash dest) { synchronized (_conLock) { NTCPConnection con = (NTCPConnection)_conByIdent.get(dest); return (con != null) && con.isEstablished() && !con.isClosed(); } } - + + @Override public boolean isBacklogged(Hash dest) { synchronized (_conLock) { NTCPConnection con = (NTCPConnection)_conByIdent.get(dest); return (con != null) && con.isEstablished() && con.tooBacklogged(); } } - + void removeCon(NTCPConnection con) { NTCPConnection removed = null; synchronized (_conLock) { @@ -352,15 +356,17 @@ public class NTCPTransport extends TransportImpl { removed.close(); } } - + /** * How many peers can we talk to right now? * */ + @Override public int countActivePeers() { synchronized (_conLock) { return _conByIdent.size(); } } /** * How many peers are we actively sending messages to (this minute) */ + @Override public int countActiveSendPeers() { int active = 0; synchronized (_conLock) { @@ -372,11 +378,12 @@ public class NTCPTransport extends TransportImpl { } return active; } - + /** * Return our peer clock skews on this transport. * Vector composed of Long, each element representing a peer skew in seconds. */ + @Override public Vector getClockSkews() { Vector peers = new Vector(); @@ -394,18 +401,18 @@ public class NTCPTransport extends TransportImpl { _log.debug("NTCP transport returning " + skews.size() + " peer clock skews."); return skews; } - + private static final int NUM_CONCURRENT_READERS = 3; private static final int NUM_CONCURRENT_WRITERS = 3; - + public RouterAddress startListening() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting ntcp transport listening"); _finisher.start(); _pumper.startPumping(); - + _reader.startReading(NUM_CONCURRENT_READERS); _writer.startWriting(NUM_CONCURRENT_WRITERS); - + configureLocalAddress(); return bindAddress(); } @@ -414,10 +421,10 @@ public class NTCPTransport extends TransportImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Restarting ntcp transport listening"); _finisher.start(); _pumper.startPumping(); - + _reader.startReading(NUM_CONCURRENT_READERS); _writer.startWriting(NUM_CONCURRENT_WRITERS); - + _myAddress = new NTCPAddress(addr); return bindAddress(); } @@ -448,7 +455,7 @@ public class NTCPTransport extends TransportImpl { if (_log.shouldLog(Log.INFO)) _log.info("Outbound NTCP connections only - no listener configured"); } - + if (_myAddress != null) { RouterAddress rv = _myAddress.toRouterAddress(); if (rv != null) @@ -458,12 +465,12 @@ public class NTCPTransport extends TransportImpl { return null; } } - + Reader getReader() { return _reader; } net.i2p.router.transport.ntcp.Writer getWriter() { return _writer; } public String getStyle() { return STYLE; } EventPumper getPumper() { return _pumper; } - + /** * how long from initial connection attempt (accept() or connect()) until * the con must be established to avoid premature close()ing @@ -504,9 +511,9 @@ public class NTCPTransport extends TransportImpl { if ( (expired != null) && (expired.size() > 0) ) _context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0); } - + //private boolean bindAllInterfaces() { return true; } - + private void configureLocalAddress() { RouterContext ctx = getContext(); if (ctx == null) { @@ -531,7 +538,7 @@ public class NTCPTransport extends TransportImpl { } } } - + /** * This doesn't (completely) block, caller should check isAlive() * before calling startListening() or restartListening() @@ -553,8 +560,9 @@ public class NTCPTransport extends TransportImpl { } } public static final String STYLE = "NTCP"; - + public void renderStatusHTML(java.io.Writer out, int sortFlags) throws IOException {} + @Override public void renderStatusHTML(java.io.Writer out, String urlBase, int sortFlags) throws IOException { TreeSet peers = new TreeSet(getComparator(sortFlags)); synchronized (_conLock) { @@ -575,7 +583,7 @@ public class NTCPTransport extends TransportImpl { long totalUptime = 0; long totalSend = 0; long totalRecv = 0; - + StringBuffer buf = new StringBuffer(512); buf.append("NTCP connections: ").append(peers.size()); buf.append(" limit: ").append(getMaxConnections()); @@ -666,19 +674,19 @@ public class NTCPTransport extends TransportImpl { buf.append("   "); buf.append("\n"); } - + buf.append("\n"); buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("
\n"); buf.append("Peers currently writing I2NP messages: ").append(writingPeers).append("
\n"); out.write(buf.toString()); buf.setLength(0); } - - private static NumberFormat _rateFmt = new DecimalFormat("#,#0.00"); + + private static final NumberFormat _rateFmt = new DecimalFormat("#,#0.00"); private static String formatRate(float rate) { synchronized (_rateFmt) { return _rateFmt.format(rate); } } - + private Comparator getComparator(int sortFlags) { Comparator rv = null; switch (Math.abs(sortFlags)) { @@ -702,7 +710,7 @@ public class NTCPTransport extends TransportImpl { } private static class PeerComparator implements Comparator { public int compare(Object lhs, Object rhs) { - if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection)) + if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection)) throw new IllegalArgumentException("rhs = " + rhs + " lhs = " + lhs); return compare((NTCPConnection)lhs, (NTCPConnection)rhs); } @@ -711,13 +719,15 @@ public class NTCPTransport extends TransportImpl { return l.getRemotePeer().calculateHash().toBase64().compareTo(r.getRemotePeer().calculateHash().toBase64()); } } - + /** * Cache the bid to reduce object churn */ private class SharedBid extends TransportBid { public SharedBid(int ms) { super(); setLatencyMs(ms); } + @Override public Transport getTransport() { return NTCPTransport.this; } + @Override public String toString() { return "NTCP bid @ " + getLatencyMs(); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index c65bb458e..1eb3d6118 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -12,35 +12,35 @@ import net.i2p.util.Log; /** * Coordinate the outbound fragments and select the next one to be built. - * This pool contains messages we are actively trying to send, essentially + * This pool contains messages we are actively trying to send, essentially * doing a round robin across each message to send one fragment, as implemented - * in {@link #getNextVolley()}. This also honors per-peer throttling, taking + * in {@link #getNextVolley()}. This also honors per-peer throttling, taking * note of each peer's allocations. If a message has each of its fragments - * sent more than a certain number of times, it is failed out. In addition, - * this instance also receives notification of message ACKs from the - * {@link InboundMessageFragments}, signaling that we can stop sending a + * sent more than a certain number of times, it is failed out. In addition, + * this instance also receives notification of message ACKs from the + * {@link InboundMessageFragments}, signaling that we can stop sending a * message. - * + * */ public class OutboundMessageFragments { private RouterContext _context; private Log _log; private UDPTransport _transport; - private ActiveThrottle _throttle; + private ActiveThrottle _throttle; // LINT not used ?? /** peers we are actively sending messages to */ - private List _activePeers; + private final List _activePeers; private boolean _alive; /** which peer should we build the next packet out of? */ private int _nextPeer; private PacketBuilder _builder; /** if we can handle more messages explicitly, set this to true */ - private boolean _allowExcess; - private volatile long _packetsRetransmitted; - - private static final int MAX_ACTIVE = 64; + private boolean _allowExcess; // LINT not used?? + private volatile long _packetsRetransmitted; // LINT not used?? + + // private static final int MAX_ACTIVE = 64; // not used. // don't send a packet more than 10 times static final int MAX_VOLLEYS = 10; - + public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) { _context = ctx; _log = ctx.logManager().getLog(OutboundMessageFragments.class); @@ -70,7 +70,7 @@ public class OutboundMessageFragments { _context.statManager().createRateStat("udp.sendCycleTime", "How long it takes to cycle through all of the active messages?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendCycleTimeSlow", "How long it takes to cycle through all of the active messages, when its going slowly?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } - + public void startup() { _alive = true; } public void shutdown() { _alive = false; @@ -87,7 +87,7 @@ public class OutboundMessageFragments { _activePeers.notifyAll(); } } - + /** * Block until we allow more messages to be admitted to the active * pool. This is called by the {@link OutboundRefiller} @@ -95,11 +95,11 @@ public class OutboundMessageFragments { * @return true if more messages are allowed */ public boolean waitForMoreAllowed() { - // test without choking. + // test without choking. // perhaps this should check the lifetime of the first activeMessage? if (true) return true; /* - + long start = _context.clock().now(); int numActive = 0; int maxActive = Math.max(_transport.countActivePeers(), MAX_ACTIVE); @@ -123,7 +123,7 @@ public class OutboundMessageFragments { */ return false; } - + /** * Add a new message to the active pool * @@ -133,7 +133,7 @@ public class OutboundMessageFragments { RouterInfo target = msg.getTarget(); if ( (msgBody == null) || (target == null) ) return; - + // todo: make sure the outNetMessage is initialzed once and only once OutboundMessageState state = new OutboundMessageState(_context); boolean ok = state.initialize(msg, msgBody); @@ -164,9 +164,9 @@ public class OutboundMessageFragments { } //finishMessages(); } - - /** - * short circuit the OutNetMessage, letting us send the establish + + /** + * short circuit the OutNetMessage, letting us send the establish * complete message reliably */ public void add(OutboundMessageState state) { @@ -228,11 +228,11 @@ public class OutboundMessageFragments { rv += remaining; } } - + private long _lastCycleTime = System.currentTimeMillis(); - + /** - * Fetch all the packets for a message volley, blocking until there is a + * Fetch all the packets for a message volley, blocking until there is a * message which can be fully transmitted (or the transport is shut down). * The returned array may be sparse, with null packets taking the place of * already ACKed fragments. @@ -270,7 +270,7 @@ public class OutboundMessageFragments { } } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Done looping, next peer we are sending for: " + + _log.debug("Done looping, next peer we are sending for: " + (peer != null ? peer.getRemotePeer().toBase64() : "none")); if (state == null) { if (_log.shouldLog(Log.DEBUG)) @@ -291,10 +291,10 @@ public class OutboundMessageFragments { _log.debug("Woken up while waiting"); } } - + if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + state); - + UDPPacket packets[] = preparePackets(state, peer); if ( (state != null) && (state.getMessage() != null) ) { int valid = 0; @@ -303,21 +303,21 @@ public class OutboundMessageFragments { valid++; /* state.getMessage().timestamp("sending a volley of " + valid - + " lastReceived: " + + " lastReceived: " + (_context.clock().now() - peer.getLastReceiveTime()) - + " lastSentFully: " + + " lastSentFully: " + (_context.clock().now() - peer.getLastSendFullyTime())); */ } return packets; } - + private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { if ( (state != null) && (peer != null) ) { int fragments = state.getFragmentCount(); if (fragments < 0) return null; - + // ok, simplest possible thing is to always tack on the bitfields if List msgIds = peer.getCurrentFullACKs(); if (msgIds == null) msgIds = new ArrayList(); @@ -353,7 +353,7 @@ public class OutboundMessageFragments { } if (sparseCount > 0) remaining.clear(); - + int piggybackedAck = 0; if (msgIds.size() != remaining.size()) { for (int i = 0; i < msgIds.size(); i++) { @@ -364,7 +364,7 @@ public class OutboundMessageFragments { } } } - + if (sparseCount > 0) _context.statManager().addRateData("udp.sendSparse", sparseCount, state.getLifetime()); if (piggybackedAck > 0) @@ -390,10 +390,10 @@ public class OutboundMessageFragments { return null; } } - + /** * We received an ACK of the given messageId from the given peer, so if it - * is still unacked, mark it as complete. + * is still unacked, mark it as complete. * * @return fragments acked */ @@ -409,7 +409,7 @@ public class OutboundMessageFragments { return 0; } } - + public void acked(ACKBitfield bitfield, Hash ackedBy) { PeerState peer = _transport.getPeerState(ackedBy); if (peer != null) { @@ -421,7 +421,7 @@ public class OutboundMessageFragments { _log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64()); } } - + public interface ActiveThrottle { public void choke(Hash peer); public void unchoke(Hash peer);