2009-04-17 sponge

* Catch NPE in NTCP.
      This possibly augments fix 2009-04-11 welterde below.
    * Various LINT on NTCP sources, and removal of space-wasting
      spaces at end of lines in sources touched.
This commit is contained in:
sponge
2009-04-17 13:11:16 +00:00
parent 834fdfe9b3
commit d0376f82a5
6 changed files with 233 additions and 215 deletions

View File

@ -1,3 +1,9 @@
2009-04-17 sponge
* Catch NPE in NTCP.
This possibly augments fix 2009-04-11 welterde below.
* Various LINT on NTCP sources, and removal of space-wasting
spaces at end of lines in sources touched.
2009-04-13 Mathiasdm
* Bugfix on tray icon updating
* Some more work on the general configuration menu

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 18;
public final static long BUILD = 19;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -1,9 +1,9 @@
package net.i2p.router.transport;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
@ -14,7 +14,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -46,10 +45,10 @@ public abstract class TransportImpl implements Transport {
private Log _log;
private TransportEventListener _listener;
private RouterAddress _currentAddress;
private List _sendPool;
private final List _sendPool;
protected RouterContext _context;
/** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */
private Map<Hash, Long> _unreachableEntries;
private final Map<Hash, Long> _unreachableEntries;
private Set<Hash> _wasUnreachableEntries;
/** global router ident -> IP */
private static Map<Hash, byte[]> _IPMap = new ConcurrentHashMap(128);
@ -61,7 +60,7 @@ public abstract class TransportImpl implements Transport {
public TransportImpl(RouterContext context) {
_context = context;
_log = _context.logManager().getLog(TransportImpl.class);
_context.statManager().createRateStat("transport.sendMessageFailureLifetime", "How long the lifetime of messages that fail are?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("transport.sendMessageSize", "How large are the messages sent?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("transport.receiveMessageSize", "How large are the messages received?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
@ -74,7 +73,7 @@ public abstract class TransportImpl implements Transport {
_wasUnreachableEntries = new ConcurrentHashSet(16);
_currentAddress = null;
}
/**
* How many peers can we talk to right now?
*
@ -111,18 +110,18 @@ public abstract class TransportImpl implements Transport {
* Can we initiate or accept a connection to another peer, saving some margin
*/
public boolean haveCapacity() { return true; }
/**
* Return our peer clock skews on a transport.
* Vector composed of Long, each element representing a peer skew in seconds.
* Dummy version. Transports override it.
*/
public Vector getClockSkews() { return new Vector(); }
public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; }
/**
* Nonblocking call to pull the next outbound message
* off the queue.
* off the queue.
*
* @return the next message or null if none are available
*/
@ -135,7 +134,7 @@ public abstract class TransportImpl implements Transport {
msg.beginSend();
return msg;
}
/**
* The transport is done sending this message
*
@ -167,7 +166,7 @@ public abstract class TransportImpl implements Transport {
}
/**
* The transport is done sending this message. This is the method that actually
* does all of the cleanup - firing off jobs, requeueing, updating stats, etc.
* does all of the cleanup - firing off jobs, requeueing, updating stats, etc.
*
* @param msg message in question
* @param sendSuccessful true if the peer received it
@ -180,64 +179,64 @@ public abstract class TransportImpl implements Transport {
msg.timestamp("afterSend(successful)");
else
msg.timestamp("afterSend(failed)");
if (!sendSuccessful)
msg.transportFailed(getStyle());
if (msToSend > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " to "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend
_log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " to "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend
+ "/" + msg.getTransmissionTime());
}
//if (true)
//if (true)
// _log.error("(not error) I2NP message sent? " + sendSuccessful + " " + msg.getMessageId() + " after " + msToSend + "/" + msg.getTransmissionTime());
long lifetime = msg.getLifetime();
if (lifetime > 3000) {
int level = Log.WARN;
if (!sendSuccessful)
level = Log.INFO;
if (_log.shouldLog(level))
_log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
_log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
+ " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString());
} else {
if (_log.shouldLog(Log.INFO))
_log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
_log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
+ " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString());
}
if (sendSuccessful) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send message " + msg.getMessageType() + " to "
+ msg.getTarget().getIdentity().getHash().toBase64() + " with transport "
_log.debug("Send message " + msg.getMessageType() + " to "
+ msg.getTarget().getIdentity().getHash().toBase64() + " with transport "
+ getStyle() + " successfully");
Job j = msg.getOnSendJob();
if (j != null)
if (j != null)
_context.jobQueue().addJob(j);
log = true;
msg.discardData();
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Failed to send message " + msg.getMessageType()
+ " to " + msg.getTarget().getIdentity().getHash().toBase64()
_log.info("Failed to send message " + msg.getMessageType()
+ " to " + msg.getTarget().getIdentity().getHash().toBase64()
+ " with transport " + getStyle() + " (details: " + msg + ")");
if (msg.getExpiration() < _context.clock().now())
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
if (allowRequeue) {
if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) )
if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) )
&& (msg.getMessage() != null) ) {
// this may not be the last transport available - keep going
_context.outNetMessagePool().add(msg);
// don't discard the data yet!
} else {
if (_log.shouldLog(Log.INFO))
_log.info("No more time left (" + new Date(msg.getExpiration())
+ ", expiring without sending successfully the "
_log.info("No more time left (" + new Date(msg.getExpiration())
+ ", expiring without sending successfully the "
+ msg.getMessageType());
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
@ -251,8 +250,8 @@ public abstract class TransportImpl implements Transport {
} else {
MessageSelector selector = msg.getReplySelector();
if (_log.shouldLog(Log.INFO))
_log.info("Failed and no requeue allowed for a "
+ msg.getMessageSize() + " byte "
_log.info("Failed and no requeue allowed for a "
+ msg.getMessageSize() + " byte "
+ msg.getMessageType() + " message with selector " + selector, new Exception("fail cause"));
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
@ -269,9 +268,9 @@ public abstract class TransportImpl implements Transport {
String type = msg.getMessageType();
// the udp transport logs some further details
/*
_context.messageHistory().sendMessage(type, msg.getMessageId(),
_context.messageHistory().sendMessage(type, msg.getMessageId(),
msg.getExpiration(),
msg.getTarget().getIdentity().getHash(),
msg.getTarget().getIdentity().getHash(),
sendSuccessful);
*/
}
@ -281,23 +280,23 @@ public abstract class TransportImpl implements Transport {
long allTime = now - msg.getCreated();
if (allTime > 5*1000) {
if (_log.shouldLog(Log.INFO))
_log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful
+ "): " + allTime + "ms/" + sendTime + "ms after failing on: "
_log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful
+ "): " + allTime + "ms/" + sendTime + "ms after failing on: "
+ msg.getFailedTransports() + " and succeeding on " + getStyle());
if ( (allTime > 60*1000) && (sendSuccessful) ) {
// WTF!!@#
if (_log.shouldLog(Log.WARN))
_log.warn("WTF, more than a minute slow? " + msg.getMessageType()
+ " of id " + msg.getMessageId() + " (send begin on "
+ new Date(msg.getSendBegin()) + " / created on "
_log.warn("WTF, more than a minute slow? " + msg.getMessageType()
+ " of id " + msg.getMessageId() + " (send begin on "
+ new Date(msg.getSendBegin()) + " / created on "
+ new Date(msg.getCreated()) + "): " + msg, msg.getCreatedBy());
_context.messageHistory().messageProcessingError(msg.getMessageId(),
msg.getMessageType(),
_context.messageHistory().messageProcessingError(msg.getMessageId(),
msg.getMessageType(),
"Took too long to send [" + allTime + "ms]");
}
}
if (sendSuccessful) {
_context.statManager().addRateData("transport.sendProcessingTime", lifetime, lifetime);
_context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), getStyle(), sendTime, msg.getMessageSize());
@ -307,7 +306,7 @@ public abstract class TransportImpl implements Transport {
_context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime, lifetime);
}
}
/**
* Asynchronously send the message as requested in the message and, if the
* send is successful, queue up any msg.getOnSendJob job, and register it
@ -323,14 +322,14 @@ public abstract class TransportImpl implements Transport {
}
boolean duplicate = false;
synchronized (_sendPool) {
if (_sendPool.contains(msg))
if (_sendPool.contains(msg))
duplicate = true;
else
_sendPool.add(msg);
}
if (duplicate) {
if (_log.shouldLog(Log.ERROR))
_log.error("Message already is in the queue? wtf. msg = " + msg,
_log.error("Message already is in the queue? wtf. msg = " + msg,
new Exception("wtf, requeued?"));
}
@ -346,15 +345,15 @@ public abstract class TransportImpl implements Transport {
* and it should not block
*/
protected abstract void outboundMessageReady();
/**
* Message received from the I2NPMessageReader - send it to the listener
*
*/
public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
//if (true)
//if (true)
// _log.error("(not error) I2NP message received: " + inMsg.getUniqueId() + " after " + msToReceive);
int level = Log.INFO;
if (msToReceive > 5000)
level = Log.WARN;
@ -385,7 +384,7 @@ public abstract class TransportImpl implements Transport {
_context.profileManager().messageReceived(remoteIdentHash, getStyle(), msToReceive, bytesReceived);
_context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive);
}
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
if (msToReceive > 1000) {
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive);
@ -394,7 +393,7 @@ public abstract class TransportImpl implements Transport {
//// this functionality is built into the InNetMessagePool
//String type = inMsg.getClass().getName();
//MessageHistory.getInstance().receiveMessage(type, inMsg.getUniqueId(), inMsg.getMessageExpiration(), remoteIdentHash, true);
if (_listener != null) {
_listener.messageReceived(inMsg, remoteIdent, remoteIdentHash);
} else {
@ -402,9 +401,9 @@ public abstract class TransportImpl implements Transport {
_log.error("WTF! Null listener! this = " + toString(), new Exception("Null listener"));
}
}
/** What addresses are we currently listening to? */
public RouterAddress getCurrentAddress() {
public RouterAddress getCurrentAddress() {
return _currentAddress;
}
/**
@ -417,19 +416,19 @@ public abstract class TransportImpl implements Transport {
if ("SSU".equals(getStyle()))
_context.commSystem().notifyReplaceAddress(address);
}
/** Who to notify on message availability */
public void setListener(TransportEventListener listener) { _listener = listener; }
/** Make this stuff pretty (only used in the old console) */
public void renderStatusHTML(Writer out) throws IOException {}
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException { renderStatusHTML(out); }
public RouterContext getContext() { return _context; }
public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; }
public void recheckReachability() {}
public boolean isBacklogged(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }
private static final long UNREACHABLE_PERIOD = 5*60*1000;
public boolean isUnreachable(Hash peer) {
long now = _context.clock().now();
@ -506,7 +505,7 @@ public abstract class TransportImpl implements Transport {
_log.warn(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer);
}
public static void setIP(Hash peer, byte[] ip) {
public /* static */ void setIP(Hash peer, byte[] ip) {
_IPMap.put(peer, ip);
}
@ -517,7 +516,7 @@ public abstract class TransportImpl implements Transport {
public static boolean isPubliclyRoutable(byte addr[]) {
if (addr.length == 4) {
if ((addr[0]&0xFF) == 127) return false;
if ((addr[0]&0xFF) == 10) return false;
if ((addr[0]&0xFF) == 10) return false;
if ( ((addr[0]&0xFF) == 172) && ((addr[1]&0xFF) >= 16) && ((addr[1]&0xFF) <= 31) ) return false;
if ( ((addr[0]&0xFF) == 192) && ((addr[1]&0xFF) == 168) ) return false;
if ((addr[0]&0xFF) >= 224) return false; // no multicast

View File

@ -45,7 +45,7 @@ import net.i2p.util.Log;
public class EstablishState {
private RouterContext _context;
private Log _log;
// bob receives (and alice sends)
private byte _X[];
private byte _hX_xor_bobIdentHash[];
@ -60,28 +60,28 @@ public class EstablishState {
private transient long _tsB;
private transient long _tsA;
private transient byte _e_bobSig[];
/** previously received encrypted block (or the IV) */
private byte _prevEncrypted[];
/** current encrypted block we are reading */
private byte _curEncrypted[];
/**
* next index in _curEncrypted to write to (equals _curEncrypted length if the block is
* next index in _curEncrypted to write to (equals _curEncrypted length if the block is
* ready to decrypt)
*/
private int _curEncryptedOffset;
/** decryption buffer */
private byte _curDecrypted[];
/** bytes received so far */
private int _received;
/** bytes sent so far */
private int _sent;
private byte _extra[];
private DHSessionKeyBuilder _dh;
private NTCPTransport _transport;
private NTCPConnection _con;
private boolean _corrupt;
@ -92,7 +92,7 @@ public class EstablishState {
private boolean _verified;
private boolean _confirmWritten;
private boolean _failedBySkew;
public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
@ -113,15 +113,15 @@ public class EstablishState {
byte hx[] = ctx.sha().calculateHash(_X).getData();
DataHelper.xor(hx, 0, con.getRemotePeer().calculateHash().getData(), 0, _hX_xor_bobIdentHash, 0, hx.length);
}
_prevEncrypted = new byte[16];
_curEncrypted = new byte[16];
_curEncryptedOffset = 0;
_curDecrypted = new byte[16];
_received = 0;
}
/**
* parse the contents of the buffer as part of the handshake. if the
* handshake is completed and there is more data remaining, the buffer is
@ -133,7 +133,7 @@ public class EstablishState {
throw new IllegalStateException(prefix() + "received after completion [corrupt?" + _corrupt + " verified? " + _verified + "] on " + _con);
if (!src.hasRemaining())
return; // nothing to receive
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"receive " + src);
if (_con.isInbound())
@ -141,15 +141,15 @@ public class EstablishState {
else
receiveOutbound(src);
}
/**
* we have written all of the data required to confirm the connection
* establishment
*/
public boolean confirmWritten() { return _confirmWritten; }
public boolean getFailedBySkew() { return _failedBySkew; }
/** we are Bob, so receive these bytes as part of an inbound connection */
private void receiveInbound(ByteBuffer src) {
if (_log.shouldLog(Log.DEBUG))
@ -178,7 +178,7 @@ public class EstablishState {
if (_dh.getSessionKey() == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"Enough data for a DH received");
// first verify that Alice knows who she is trying to talk with and that the X
// isn't corrupt
Hash hX = _context.sha().calculateHash(_X);
@ -201,7 +201,7 @@ public class EstablishState {
System.arraycopy(realXor, 16, _prevEncrypted, 0, _prevEncrypted.length);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
// now prepare our response: Y+E(H(X+Y)+tsB+padding, sk, Y[239:255])
_Y = _dh.getMyPublicValueBytes();
byte xy[] = new byte[_X.length+_Y.length];
@ -233,7 +233,7 @@ public class EstablishState {
byte write[] = new byte[_Y.length + _e_hXY_tsB.length];
System.arraycopy(_Y, 0, write, 0, _Y.length);
System.arraycopy(_e_hXY_tsB, 0, write, _Y.length, _e_hXY_tsB.length);
// ok, now that is prepared, we want to actually send it, so make sure we are up for writing
_transport.getPumper().wantsWrite(_con, write);
if (!src.hasRemaining()) return;
@ -243,7 +243,7 @@ public class EstablishState {
return;
}
}
// ok, we are onto the encrypted area
while (src.hasRemaining() && !_corrupt) {
if (_log.shouldLog(Log.DEBUG))
@ -256,12 +256,12 @@ public class EstablishState {
_context.aes().decrypt(_curEncrypted, 0, _curDecrypted, 0, _dh.getSessionKey(), _prevEncrypted, 0, _curEncrypted.length);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(prefix()+"full block read and decrypted: " + Base64.encode(_curDecrypted));
byte swap[] = new byte[16];
_prevEncrypted = _curEncrypted;
_curEncrypted = swap;
_curEncryptedOffset = 0;
if (_aliceIdentSize <= 0) { // we are on the first decrypted block
_aliceIdentSize = (int)DataHelper.fromLong(_curDecrypted, 0, 2);
_sz_aliceIdent_tsA_padding_aliceSigSize = 2 + _aliceIdentSize + 4 + Signature.SIGNATURE_BYTES;
@ -292,8 +292,8 @@ public class EstablishState {
if (!_corrupt && _verified && src.hasRemaining())
prepareExtra(src);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size()
+ " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize
_log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size()
+ " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize
+ " corrupt=" + _corrupt
+ " verified=" + _verified + " extra=" + (_extra != null ? _extra.length : 0) + ")");
return;
@ -310,11 +310,11 @@ public class EstablishState {
_log.debug(prefix()+"done with the data, not yet complete or corrupt");
}
}
/** we are Alice, so receive these bytes as part of an outbound connection */
private void receiveOutbound(ByteBuffer src) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"Receive outbound " + src + " received=" + _received);
// recv Y+E(H(X+Y)+tsB, sk, Y[239:255])
while (_received < _Y.length && src.hasRemaining()) {
byte c = src.get();
@ -361,7 +361,7 @@ public class EstablishState {
_tsA = _context.clock().now()/1000; // our (Alice's) timestamp in seconds
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB));
// the skew is not authenticated yet, but it is certainly fatal to
// the establishment, so fail hard if appropriate
long diff = 1000*Math.abs(_tsA-_tsB);
@ -374,7 +374,7 @@ public class EstablishState {
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
// now prepare and send our response
// send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
int sigSize = _X.length+_Y.length+Hash.HASH_LENGTH+4+4;//+12;
@ -390,11 +390,11 @@ public class EstablishState {
//_context.random().nextBytes(sigPad);
//System.arraycopy(sigPad, 0, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, padSig);
Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
//if (_log.shouldLog(Log.DEBUG)) {
// _log.debug(prefix()+"signing " + Base64.encode(preSign));
//}
byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
int min = 2+ident.length+4+Signature.SIGNATURE_BYTES;
int rem = min % 16;
@ -409,10 +409,10 @@ public class EstablishState {
_context.random().nextBytes(pad);
System.arraycopy(pad, 0, preEncrypt, 2+ident.length+4, padding);
System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, Signature.SIGNATURE_BYTES);
_prevEncrypted = new byte[preEncrypt.length];
_context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(), _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-16, preEncrypt.length);
if (_log.shouldLog(Log.DEBUG)) {
//_log.debug(prefix() + "unencrypted response to Bob: " + Base64.encode(preEncrypt));
//_log.debug(prefix() + "encrypted response to Bob: " + Base64.encode(_prevEncrypted));
@ -423,7 +423,7 @@ public class EstablishState {
}
if (_received >= _Y.length + _e_hXY_tsB.length && src.hasRemaining()) {
// we are receiving their confirmation
// recv E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev)
int off = 0;
if (_e_bobSig == null) {
@ -439,7 +439,7 @@ public class EstablishState {
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"recv bobSig received=" + _received);
_e_bobSig[off++] = src.get();
_received++;
if (off >= _e_bobSig.length) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(prefix() + "received E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev): " + Base64.encode(_e_bobSig));
@ -449,7 +449,7 @@ public class EstablishState {
byte bobSigData[] = new byte[Signature.SIGNATURE_BYTES];
System.arraycopy(bobSig, 0, bobSigData, 0, Signature.SIGNATURE_BYTES);
Signature sig = new Signature(bobSigData);
byte toVerify[] = new byte[_X.length+_Y.length+Hash.HASH_LENGTH+4+4];
int voff = 0;
System.arraycopy(_X, 0, toVerify, voff, _X.length); voff += _X.length;
@ -457,7 +457,7 @@ public class EstablishState {
System.arraycopy(_context.routerHash().getData(), 0, toVerify, voff, Hash.HASH_LENGTH); voff += Hash.HASH_LENGTH;
DataHelper.toLong(toVerify, voff, 4, _tsA); voff += 4;
DataHelper.toLong(toVerify, voff, 4, _tsB); voff += 4;
_verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey());
if (!_verified) {
_context.statManager().addRateData("ntcp.invalidSignature", 1, 0);
@ -481,12 +481,12 @@ public class EstablishState {
}
}
}
/** did the handshake fail for some reason? */
public boolean isCorrupt() { return _err != null; }
/** @return is the handshake complete and valid? */
public boolean isComplete() { return _verified; }
/**
* we are establishing an outbound connection, so prepare ourselves by
* queueing up the write of the first part of the handshake
@ -504,10 +504,10 @@ public class EstablishState {
_log.debug(prefix()+"prepare outbound with received=" + _received);
}
}
/**
* make sure the signatures are correct, and if they are, update the
* NIOConnection with the session key / peer ident / clock skew / iv.
* NIOConnection with the session key / peer ident / clock skew / iv.
* The NIOConnection itself is responsible for registering with the
* transport
*/
@ -516,10 +516,10 @@ public class EstablishState {
byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(prefix()+"decrypted sz(etc) data: " + Base64.encode(b));
try {
RouterIdentity alice = new RouterIdentity();
int sz = (int)DataHelper.fromLong(b, 0, 2);
int sz = (int)DataHelper.fromLong(b, 0, 2); // TO-DO: Hey zzz... Throws an NPE for me... see below, for my "quick fix", need to find out the real reason
if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) {
_context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0);
fail("size is invalid", new Exception("size is " + sz));
@ -529,7 +529,7 @@ public class EstablishState {
System.arraycopy(b, 2, aliceData, 0, sz);
alice.fromByteArray(aliceData);
long tsA = DataHelper.fromLong(b, 2+sz, 4);
ByteArrayOutputStream baos = new ByteArrayOutputStream(768);
baos.write(_X);
baos.write(_Y);
@ -537,7 +537,7 @@ public class EstablishState {
baos.write(DataHelper.toLong(4, tsA));
baos.write(DataHelper.toLong(4, _tsB));
//baos.write(b, 2+sz+4, b.length-2-sz-4-Signature.SIGNATURE_BYTES);
byte toVerify[] = baos.toByteArray();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"checking " + Base64.encode(toVerify, 0, 16));
@ -566,7 +566,7 @@ public class EstablishState {
_transport.setIP(alice.calculateHash(), ip);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "verification successful for " + _con);
long diff = 1000*Math.abs(tsA-_tsB);
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
@ -597,9 +597,11 @@ public class EstablishState {
} catch (DataFormatException dfe) {
_context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0);
fail("Error verifying peer", dfe);
} catch(NullPointerException npe) {
fail("Error verifying peer", npe); // TO-DO: zzz This is that quick-fix. -- Sponge
}
}
private void sendInboundConfirm(RouterIdentity alice, long tsA) {
// send Alice E(S(X+Y+Alice.identHash+tsA+tsB), sk, prev)
byte toSign[] = new byte[256+256+32+4+4];
@ -610,7 +612,7 @@ public class EstablishState {
System.arraycopy(h.getData(), 0, toSign, off, 32); off += 32;
DataHelper.toLong(toSign, off, 4, tsA); off += 4;
DataHelper.toLong(toSign, off, 4, _tsB); off += 4;
Signature sig = _context.dsa().sign(toSign, _context.keyManager().getSigningPrivateKey());
byte preSig[] = new byte[Signature.SIGNATURE_BYTES+8];
byte pad[] = new byte[8];
@ -619,12 +621,12 @@ public class EstablishState {
System.arraycopy(pad, 0, preSig, Signature.SIGNATURE_BYTES, pad.length);
_e_bobSig = new byte[preSig.length];
_context.aes().encrypt(preSig, 0, _e_bobSig, 0, _dh.getSessionKey(), _e_hXY_tsB, _e_hXY_tsB.length-16, _e_bobSig.length);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "Sending encrypted inbound confirmation");
_transport.getPumper().wantsWrite(_con, _e_bobSig);
}
/** anything left over in the byte buffer after verification is extra */
private void prepareExtra(ByteBuffer buf) {
int remaining = buf.remaining();
@ -636,13 +638,13 @@ public class EstablishState {
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")");
}
/**
* if complete, this will contain any bytes received as part of the
* handshake that were after the actual handshake. This may return null.
*/
public byte[] getExtraBytes() { return _extra; }
private void fail(String reason) { fail(reason, null); }
private void fail(String reason, Exception e) { fail(reason, e, false); }
private void fail(String reason, Exception e, boolean bySkew) {
@ -653,11 +655,12 @@ public class EstablishState {
if (_log.shouldLog(Log.WARN))
_log.warn(prefix()+"Failed to establish: " + _err, e);
}
public String getError() { return _err; }
public Exception getException() { return _e; }
private String prefix() { return toString(); }
@Override
public String toString() {
StringBuffer buf = new StringBuffer(64);
buf.append("est").append(System.identityHashCode(this));
@ -669,7 +672,7 @@ public class EstablishState {
buf.append(": ");
return buf.toString();
}
/**
* a check info connection will receive 256 bytes containing:
* - 32 bytes of uninterpreted, ignored data
@ -703,7 +706,7 @@ public class EstablishState {
off += 4;
long skewSeconds = (ctx.clock().now()/1000)-now;
if (log.shouldLog(Log.INFO))
log.info("Check info received: our IP: " + ourIP + " our port: " + port
log.info("Check info received: our IP: " + ourIP + " our port: " + port
+ " skew: " + skewSeconds + " s");
} catch (UnknownHostException uhe) {
// ipSize is invalid
@ -717,7 +720,7 @@ public class EstablishState {
return false;
}
}
public static void checkHost(String args[]) {
if (args.length != 3) {
System.err.println("Usage: EstablishState ipOrHostname portNum peerHashBase64");
@ -746,7 +749,7 @@ public class EstablishState {
Hash h = ctx.sha().calculateHash(toSend, 32, toSend.length-32-32);
DataHelper.xor(peer, 0, h.getData(), 0, toSend, toSend.length-32, peer.length);
System.out.println("check hash: " + h.toBase64());
out.write(toSend);
out.flush();
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
@ -755,7 +758,7 @@ public class EstablishState {
e.printStackTrace();
}
}
public static void main(String args[]) {
if (args.length == 3) {
checkHost(args);
@ -780,7 +783,7 @@ public class EstablishState {
out.write(hx_xor_bih);
out.flush();
// DONE SENDING X+(H(X) xor Bob.identHash)----------------------------->
// NOW READ Y+E(H(X+Y)+tsB+padding, sk, Y[239:255])
InputStream in = s.getInputStream();
byte toRead[] = new byte[256+(32+4+12)];
@ -808,9 +811,9 @@ public class EstablishState {
System.out.println("encrypted H(X+Y)+tsB+padding: " + Base64.encode(toRead, Y.length, toRead.length-Y.length));
System.out.println("unencrypted H(X+Y)+tsB+padding: " + Base64.encode(decrypted));
long tsB = DataHelper.fromLong(decrypted, 32, 4);
//try { Thread.sleep(40*1000); } catch (InterruptedException ie) {}
RouterIdentity alice = new RouterIdentity();
Object k[] = ctx.keyGenerator().generatePKIKeypair();
PublicKey pub = (PublicKey)k[0];
@ -821,16 +824,16 @@ public class EstablishState {
alice.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
alice.setPublicKey(pub);
alice.setSigningPublicKey(spub);
// SEND E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB+padding), sk, hX_xor_Bob.identHash[16:31])--->
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
byte aliceb[] = alice.toByteArray();
long tsA = ctx.clock().now()/1000l;
baos.write(DataHelper.toLong(2, aliceb.length));
baos.write(aliceb);
baos.write(DataHelper.toLong(4, tsA));
int base = baos.size() + Signature.SIGNATURE_BYTES;
int rem = base % 16;
int padding = 0;
@ -840,7 +843,7 @@ public class EstablishState {
ctx.random().nextBytes(pad);
baos.write(pad);
base += padding;
ByteArrayOutputStream sbaos = new ByteArrayOutputStream(512);
sbaos.write(X);
sbaos.write(Y);
@ -850,21 +853,21 @@ public class EstablishState {
//sbaos.write(pad);
Signature sig = ctx.dsa().sign(sbaos.toByteArray(), spriv);
baos.write(sig.toByteArray());
byte unencrypted[] = baos.toByteArray();
byte toWrite[] = new byte[unencrypted.length];
System.out.println("unencrypted.length = " + unencrypted.length + " alice.size = " + aliceb.length + " padding = " + padding + " base = " + base);
ctx.aes().encrypt(unencrypted, 0, toWrite, 0, dh.getSessionKey(), hx_xor_bih, 16, unencrypted.length);
out.write(toWrite);
out.flush();
System.out.println("unencrypted: " + Base64.encode(unencrypted));
System.out.println("encrypted: " + Base64.encode(toWrite));
System.out.println("Local peer: " + alice.calculateHash().toBase64());
// now check bob's signature
SigningPublicKey bobPubKey = null;
try {
RouterInfo info = new RouterInfo();
@ -874,9 +877,9 @@ public class EstablishState {
e.printStackTrace();
return;
}
System.out.println("Reading in bob's sig");
byte bobRead[] = new byte[48];
read = 0;
while (read < bobRead.length) {
@ -892,7 +895,7 @@ public class EstablishState {
byte bobSigData[] = new byte[Signature.SIGNATURE_BYTES];
System.arraycopy(preSig, 0, bobSigData, 0, Signature.SIGNATURE_BYTES); // ignore the padding
System.out.println("Bob's sig: " + Base64.encode(bobSigData));
byte signed[] = new byte[256+256+32+4+4];
int off = 0;
System.arraycopy(X, 0, signed, off, 256); off += 256;
@ -904,18 +907,18 @@ public class EstablishState {
Signature bobSig = new Signature(bobSigData);
boolean ok = ctx.dsa().verifySignature(bobSig, signed, bobPubKey);
System.out.println("bob's sig matches? " + ok);
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
byte fakeI2NPbuf[] = new byte[128];
ctx.random().nextBytes(fakeI2NPbuf);
out.write(fakeI2NPbuf);
out.flush();
try { Thread.sleep(30*1000); } catch (InterruptedException ie) {}
s.close();
} catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
}

View File

@ -36,7 +36,7 @@ public class NTCPTransport extends TransportImpl {
private SharedBid _fastBid;
private SharedBid _slowBid;
private SharedBid _transientFail;
private Object _conLock;
private final Object _conLock;
private Map _conByIdent;
private NTCPAddress _myAddress;
private EventPumper _pumper;
@ -46,14 +46,14 @@ public class NTCPTransport extends TransportImpl {
* list of NTCPConnection of connections not yet established that we
* want to remove on establishment or close on timeout
*/
private List _establishing;
private final List _establishing;
private List _sent;
private NTCPSendFinisher _finisher;
public NTCPTransport(RouterContext ctx) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_context.statManager().createRateStat("ntcp.sendTime", "Total message lifetime when sent completely", "ntcp", new long[] { 60*1000, 10*60*1000 });
@ -122,19 +122,19 @@ public class NTCPTransport extends TransportImpl {
_establishing = new ArrayList(4);
_conLock = new Object();
_conByIdent = new HashMap(64);
_sent = new ArrayList(4);
_finisher = new NTCPSendFinisher(ctx, this);
_pumper = new EventPumper(ctx, this);
_reader = new Reader(ctx);
_writer = new net.i2p.router.transport.ntcp.Writer(ctx);
_fastBid = new SharedBid(25); // best
_slowBid = new SharedBid(70); // better than ssu unestablished, but not better than ssu established
_transientFail = new SharedBid(TransportBid.TRANSIENT_FAIL);
}
void inboundEstablished(NTCPConnection con) {
_context.statManager().addRateData("ntcp.inboundEstablished", 1, 0);
markReachable(con.getRemotePeer().calculateHash(), true);
@ -150,7 +150,7 @@ public class NTCPTransport extends TransportImpl {
old.close();
}
}
protected void outboundMessageReady() {
OutNetMessage msg = getNextMessage();
if (msg != null) {
@ -219,7 +219,7 @@ public class NTCPTransport extends TransportImpl {
}
con.enqueueInfoMessage(); // enqueues a netDb store of our own info
con.send(msg); // doesn't do anything yet, just enqueues it
try {
SocketChannel channel = SocketChannel.open();
con.setChannel(channel);
@ -237,7 +237,8 @@ public class NTCPTransport extends TransportImpl {
*/
}
}
@Override
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
super.afterSend(msg, sendSuccessful, allowRequeue, msToSend);
}
@ -259,7 +260,7 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0);
return null;
}
boolean established = isEstablished(toAddress.getIdentity());
if (established) { // should we check the queue size? nah, if its valid, use it
if (_log.shouldLog(Log.DEBUG))
@ -267,7 +268,7 @@ public class NTCPTransport extends TransportImpl {
return _fastBid;
}
RouterAddress addr = toAddress.getTargetAddress(STYLE);
if (addr == null) {
markUnreachable(peer);
_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1, 0);
@ -294,25 +295,26 @@ public class NTCPTransport extends TransportImpl {
return null;
}
}
if (!allowConnection()) {
if (_log.shouldLog(Log.WARN))
_log.warn("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + ", max connection limit reached");
return _transientFail;
}
//if ( (_myAddress != null) && (_myAddress.equals(addr)) )
//if ( (_myAddress != null) && (_myAddress.equals(addr)) )
// return null; // dont talk to yourself
if (_log.shouldLog(Log.DEBUG))
_log.debug("slow bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64());
return _slowBid;
}
public boolean allowConnection() {
return countActivePeers() < getMaxConnections();
}
@Override
public boolean haveCapacity() {
return countActivePeers() < getMaxConnections() * 4 / 5;
}
@ -323,21 +325,23 @@ public class NTCPTransport extends TransportImpl {
private boolean isEstablished(RouterIdentity peer) {
return isEstablished(peer.calculateHash());
}
@Override
public boolean isEstablished(Hash dest) {
synchronized (_conLock) {
NTCPConnection con = (NTCPConnection)_conByIdent.get(dest);
return (con != null) && con.isEstablished() && !con.isClosed();
}
}
@Override
public boolean isBacklogged(Hash dest) {
synchronized (_conLock) {
NTCPConnection con = (NTCPConnection)_conByIdent.get(dest);
return (con != null) && con.isEstablished() && con.tooBacklogged();
}
}
void removeCon(NTCPConnection con) {
NTCPConnection removed = null;
synchronized (_conLock) {
@ -352,15 +356,17 @@ public class NTCPTransport extends TransportImpl {
removed.close();
}
}
/**
* How many peers can we talk to right now?
*
*/
@Override
public int countActivePeers() { synchronized (_conLock) { return _conByIdent.size(); } }
/**
* How many peers are we actively sending messages to (this minute)
*/
@Override
public int countActiveSendPeers() {
int active = 0;
synchronized (_conLock) {
@ -372,11 +378,12 @@ public class NTCPTransport extends TransportImpl {
}
return active;
}
/**
* Return our peer clock skews on this transport.
* Vector composed of Long, each element representing a peer skew in seconds.
*/
@Override
public Vector getClockSkews() {
Vector peers = new Vector();
@ -394,18 +401,18 @@ public class NTCPTransport extends TransportImpl {
_log.debug("NTCP transport returning " + skews.size() + " peer clock skews.");
return skews;
}
private static final int NUM_CONCURRENT_READERS = 3;
private static final int NUM_CONCURRENT_WRITERS = 3;
public RouterAddress startListening() {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
_writer.startWriting(NUM_CONCURRENT_WRITERS);
configureLocalAddress();
return bindAddress();
}
@ -414,10 +421,10 @@ public class NTCPTransport extends TransportImpl {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Restarting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
_writer.startWriting(NUM_CONCURRENT_WRITERS);
_myAddress = new NTCPAddress(addr);
return bindAddress();
}
@ -448,7 +455,7 @@ public class NTCPTransport extends TransportImpl {
if (_log.shouldLog(Log.INFO))
_log.info("Outbound NTCP connections only - no listener configured");
}
if (_myAddress != null) {
RouterAddress rv = _myAddress.toRouterAddress();
if (rv != null)
@ -458,12 +465,12 @@ public class NTCPTransport extends TransportImpl {
return null;
}
}
Reader getReader() { return _reader; }
net.i2p.router.transport.ntcp.Writer getWriter() { return _writer; }
public String getStyle() { return STYLE; }
EventPumper getPumper() { return _pumper; }
/**
* how long from initial connection attempt (accept() or connect()) until
* the con must be established to avoid premature close()ing
@ -504,9 +511,9 @@ public class NTCPTransport extends TransportImpl {
if ( (expired != null) && (expired.size() > 0) )
_context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0);
}
//private boolean bindAllInterfaces() { return true; }
private void configureLocalAddress() {
RouterContext ctx = getContext();
if (ctx == null) {
@ -531,7 +538,7 @@ public class NTCPTransport extends TransportImpl {
}
}
}
/**
* This doesn't (completely) block, caller should check isAlive()
* before calling startListening() or restartListening()
@ -553,8 +560,9 @@ public class NTCPTransport extends TransportImpl {
}
}
public static final String STYLE = "NTCP";
public void renderStatusHTML(java.io.Writer out, int sortFlags) throws IOException {}
@Override
public void renderStatusHTML(java.io.Writer out, String urlBase, int sortFlags) throws IOException {
TreeSet peers = new TreeSet(getComparator(sortFlags));
synchronized (_conLock) {
@ -575,7 +583,7 @@ public class NTCPTransport extends TransportImpl {
long totalUptime = 0;
long totalSend = 0;
long totalRecv = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size());
buf.append(" limit: ").append(getMaxConnections());
@ -666,19 +674,19 @@ public class NTCPTransport extends TransportImpl {
buf.append("</td><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;");
buf.append("</td></tr>\n");
}
buf.append("</table>\n");
buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("<br />\n");
buf.append("Peers currently writing I2NP messages: ").append(writingPeers).append("<br />\n");
out.write(buf.toString());
buf.setLength(0);
}
private static NumberFormat _rateFmt = new DecimalFormat("#,#0.00");
private static final NumberFormat _rateFmt = new DecimalFormat("#,#0.00");
private static String formatRate(float rate) {
synchronized (_rateFmt) { return _rateFmt.format(rate); }
}
private Comparator getComparator(int sortFlags) {
Comparator rv = null;
switch (Math.abs(sortFlags)) {
@ -702,7 +710,7 @@ public class NTCPTransport extends TransportImpl {
}
private static class PeerComparator implements Comparator {
public int compare(Object lhs, Object rhs) {
if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection))
if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection))
throw new IllegalArgumentException("rhs = " + rhs + " lhs = " + lhs);
return compare((NTCPConnection)lhs, (NTCPConnection)rhs);
}
@ -711,13 +719,15 @@ public class NTCPTransport extends TransportImpl {
return l.getRemotePeer().calculateHash().toBase64().compareTo(r.getRemotePeer().calculateHash().toBase64());
}
}
/**
* Cache the bid to reduce object churn
*/
private class SharedBid extends TransportBid {
public SharedBid(int ms) { super(); setLatencyMs(ms); }
@Override
public Transport getTransport() { return NTCPTransport.this; }
@Override
public String toString() { return "NTCP bid @ " + getLatencyMs(); }
}
}

View File

@ -12,35 +12,35 @@ import net.i2p.util.Log;
/**
* Coordinate the outbound fragments and select the next one to be built.
* This pool contains messages we are actively trying to send, essentially
* This pool contains messages we are actively trying to send, essentially
* doing a round robin across each message to send one fragment, as implemented
* in {@link #getNextVolley()}. This also honors per-peer throttling, taking
* in {@link #getNextVolley()}. This also honors per-peer throttling, taking
* note of each peer's allocations. If a message has each of its fragments
* sent more than a certain number of times, it is failed out. In addition,
* this instance also receives notification of message ACKs from the
* {@link InboundMessageFragments}, signaling that we can stop sending a
* sent more than a certain number of times, it is failed out. In addition,
* this instance also receives notification of message ACKs from the
* {@link InboundMessageFragments}, signaling that we can stop sending a
* message.
*
*
*/
public class OutboundMessageFragments {
private RouterContext _context;
private Log _log;
private UDPTransport _transport;
private ActiveThrottle _throttle;
private ActiveThrottle _throttle; // LINT not used ??
/** peers we are actively sending messages to */
private List _activePeers;
private final List _activePeers;
private boolean _alive;
/** which peer should we build the next packet out of? */
private int _nextPeer;
private PacketBuilder _builder;
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
private volatile long _packetsRetransmitted;
private static final int MAX_ACTIVE = 64;
private boolean _allowExcess; // LINT not used??
private volatile long _packetsRetransmitted; // LINT not used??
// private static final int MAX_ACTIVE = 64; // not used.
// don't send a packet more than 10 times
static final int MAX_VOLLEYS = 10;
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
_context = ctx;
_log = ctx.logManager().getLog(OutboundMessageFragments.class);
@ -70,7 +70,7 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendCycleTime", "How long it takes to cycle through all of the active messages?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendCycleTimeSlow", "How long it takes to cycle through all of the active messages, when its going slowly?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() { _alive = true; }
public void shutdown() {
_alive = false;
@ -87,7 +87,7 @@ public class OutboundMessageFragments {
_activePeers.notifyAll();
}
}
/**
* Block until we allow more messages to be admitted to the active
* pool. This is called by the {@link OutboundRefiller}
@ -95,11 +95,11 @@ public class OutboundMessageFragments {
* @return true if more messages are allowed
*/
public boolean waitForMoreAllowed() {
// test without choking.
// test without choking.
// perhaps this should check the lifetime of the first activeMessage?
if (true) return true;
/*
long start = _context.clock().now();
int numActive = 0;
int maxActive = Math.max(_transport.countActivePeers(), MAX_ACTIVE);
@ -123,7 +123,7 @@ public class OutboundMessageFragments {
*/
return false;
}
/**
* Add a new message to the active pool
*
@ -133,7 +133,7 @@ public class OutboundMessageFragments {
RouterInfo target = msg.getTarget();
if ( (msgBody == null) || (target == null) )
return;
// todo: make sure the outNetMessage is initialzed once and only once
OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg, msgBody);
@ -164,9 +164,9 @@ public class OutboundMessageFragments {
}
//finishMessages();
}
/**
* short circuit the OutNetMessage, letting us send the establish
/**
* short circuit the OutNetMessage, letting us send the establish
* complete message reliably
*/
public void add(OutboundMessageState state) {
@ -228,11 +228,11 @@ public class OutboundMessageFragments {
rv += remaining;
}
}
private long _lastCycleTime = System.currentTimeMillis();
/**
* Fetch all the packets for a message volley, blocking until there is a
* Fetch all the packets for a message volley, blocking until there is a
* message which can be fully transmitted (or the transport is shut down).
* The returned array may be sparse, with null packets taking the place of
* already ACKed fragments.
@ -270,7 +270,7 @@ public class OutboundMessageFragments {
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Done looping, next peer we are sending for: " +
_log.debug("Done looping, next peer we are sending for: " +
(peer != null ? peer.getRemotePeer().toBase64() : "none"));
if (state == null) {
if (_log.shouldLog(Log.DEBUG))
@ -291,10 +291,10 @@ public class OutboundMessageFragments {
_log.debug("Woken up while waiting");
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + state);
UDPPacket packets[] = preparePackets(state, peer);
if ( (state != null) && (state.getMessage() != null) ) {
int valid = 0;
@ -303,21 +303,21 @@ public class OutboundMessageFragments {
valid++;
/*
state.getMessage().timestamp("sending a volley of " + valid
+ " lastReceived: "
+ " lastReceived: "
+ (_context.clock().now() - peer.getLastReceiveTime())
+ " lastSentFully: "
+ " lastSentFully: "
+ (_context.clock().now() - peer.getLastSendFullyTime()));
*/
}
return packets;
}
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
if ( (state != null) && (peer != null) ) {
int fragments = state.getFragmentCount();
if (fragments < 0)
return null;
// ok, simplest possible thing is to always tack on the bitfields if
List msgIds = peer.getCurrentFullACKs();
if (msgIds == null) msgIds = new ArrayList();
@ -353,7 +353,7 @@ public class OutboundMessageFragments {
}
if (sparseCount > 0)
remaining.clear();
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int i = 0; i < msgIds.size(); i++) {
@ -364,7 +364,7 @@ public class OutboundMessageFragments {
}
}
}
if (sparseCount > 0)
_context.statManager().addRateData("udp.sendSparse", sparseCount, state.getLifetime());
if (piggybackedAck > 0)
@ -390,10 +390,10 @@ public class OutboundMessageFragments {
return null;
}
}
/**
* We received an ACK of the given messageId from the given peer, so if it
* is still unacked, mark it as complete.
* is still unacked, mark it as complete.
*
* @return fragments acked
*/
@ -409,7 +409,7 @@ public class OutboundMessageFragments {
return 0;
}
}
public void acked(ACKBitfield bitfield, Hash ackedBy) {
PeerState peer = _transport.getPeerState(ackedBy);
if (peer != null) {
@ -421,7 +421,7 @@ public class OutboundMessageFragments {
_log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64());
}
}
public interface ActiveThrottle {
public void choke(Hash peer);
public void unchoke(Hash peer);