2006-02-22 jrandom

* Handle a rare race under high bandwidth situations in the SSU transport
    * Minor refactoring so we don't confuse sun's 1.6.0-b2 validator
This commit is contained in:
jrandom
2006-02-22 14:54:22 +00:00
committed by zzz
parent 5f05631936
commit 03f509ca54
10 changed files with 253 additions and 193 deletions

View File

@ -1,4 +1,8 @@
$Id: history.txt,v 1.414 2006/02/21 10:20:21 jrandom Exp $
$Id: history.txt,v 1.415 2006/02/22 01:19:19 complication Exp $
2006-02-22 jrandom
* Handle a rare race under high bandwidth situations in the SSU transport
* Minor refactoring so we don't confuse sun's 1.6.0-b2 validator
2006-02-21 Complication
* Reactivate TCP tranport by default, in addition to re-allowing

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.355 $ $Date: 2006/02/21 08:31:24 $";
public final static String ID = "$Revision: 1.356 $ $Date: 2006/02/21 10:20:20 $";
public final static String VERSION = "0.6.1.11";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -51,7 +51,7 @@ public class RepublishLeaseSetJob extends JobImpl {
_log.warn("Not publishing a LOCAL lease that isn't current - " + _dest, new Exception("Publish expired LOCAL lease?"));
} else {
getContext().statManager().addRateData("netDb.republishLeaseSetCount", 1, 0);
_facade.sendStore(_dest, ls, new OnSuccess(getContext()), new OnFailure(getContext()), REPUBLISH_LEASESET_TIMEOUT, null);
_facade.sendStore(_dest, ls, new OnRepublishSuccess(getContext()), new OnRepublishFailure(getContext(), this), REPUBLISH_LEASESET_TIMEOUT, null);
//getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _dest, ls, new OnSuccess(getContext()), new OnFailure(getContext()), REPUBLISH_LEASESET_TIMEOUT));
}
} else {
@ -76,21 +76,28 @@ public class RepublishLeaseSetJob extends JobImpl {
}
}
private class OnSuccess extends JobImpl {
public OnSuccess(RouterContext ctx) { super(ctx); }
public String getName() { return "Publish leaseSet successful"; }
public void runJob() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("successful publishing of the leaseSet for " + _dest.toBase64());
}
}
private class OnFailure extends JobImpl {
public OnFailure(RouterContext ctx) { super(ctx); }
public String getName() { return "Publish leaseSet failed"; }
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64());
RepublishLeaseSetJob.this.requeue(getContext().random().nextInt(60*1000));
}
void requeueRepublish() {
if (_log.shouldLog(Log.WARN))
_log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64());
requeue(getContext().random().nextInt(60*1000));
}
}
class OnRepublishSuccess extends JobImpl {
public OnRepublishSuccess(RouterContext ctx) { super(ctx); }
public String getName() { return "Publish leaseSet successful"; }
public void runJob() {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("successful publishing of the leaseSet for " + _dest.toBase64());
}
}
class OnRepublishFailure extends JobImpl {
private RepublishLeaseSetJob _job;
public OnRepublishFailure(RouterContext ctx, RepublishLeaseSetJob job) {
super(ctx);
_job = job;
}
public String getName() { return "Publish leaseSet failed"; }
public void runJob() { _job.requeueRepublish(); }
}

View File

@ -456,7 +456,7 @@ class SearchJob extends JobImpl {
void replyFound(DatabaseSearchReplyMessage message, Hash peer) {
long duration = _state.replyFound(peer);
// this processing can take a while, so split 'er up
getContext().jobQueue().addJob(new SearchReplyJob(getContext(), (DatabaseSearchReplyMessage)message, peer, duration));
getContext().jobQueue().addJob(new SearchReplyJob(getContext(), this, (DatabaseSearchReplyMessage)message, peer, duration));
}
/**
@ -468,132 +468,6 @@ class SearchJob extends JobImpl {
// noop
}
private final class SearchReplyJob extends JobImpl {
private DatabaseSearchReplyMessage _msg;
/**
* Peer who we think sent us the reply. Note: could be spoofed! If the
* attacker knew we were searching for a particular key from a
* particular peer, they could send us some searchReply messages with
* shitty values, trying to get us to consider that peer unreliable.
* Potential fixes include either authenticated 'from' address or use a
* nonce in the search + searchReply (and check for it in the selector).
*
*/
private Hash _peer;
private int _curIndex;
private int _invalidPeers;
private int _seenPeers;
private int _newPeers;
private int _duplicatePeers;
private int _repliesPendingVerification;
private long _duration;
public SearchReplyJob(RouterContext enclosingContext, DatabaseSearchReplyMessage message, Hash peer, long duration) {
super(enclosingContext);
_msg = message;
_peer = peer;
_curIndex = 0;
_invalidPeers = 0;
_seenPeers = 0;
_newPeers = 0;
_duplicatePeers = 0;
_repliesPendingVerification = 0;
}
public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() {
if (_curIndex >= _msg.getNumReplies()) {
if (_repliesPendingVerification > 0) {
// we received new references from the peer, but still
// haven't verified all of them, so lets give it more time
SearchReplyJob.this.requeue(_timeoutMs);
} else {
// either they didn't tell us anything new or we have verified
// (or failed to verify) all of them. we're done
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
if (_newPeers > 0)
newPeersFound(_newPeers);
}
} else {
Hash peer = _msg.getReply(_curIndex);
boolean shouldAdd = false;
RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer);
if (info == null) {
// if the peer is giving us lots of bad peer references,
// dont try to fetch them.
boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer);
if (!sendsBadInfo) {
// we don't need to search for everthing we're given here - only ones that
// are next in our search path...
if (getContext().shitlist().isShitlisted(peer)) {
if (_log.shouldLog(Log.INFO))
_log.info("Not looking for a shitlisted peer...");
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
} else {
//getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
//_repliesPendingVerification++;
shouldAdd = true;
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
}
}
if (_state.wasAttempted(peer)) {
_duplicatePeers++;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": dbSearchReply received on search referencing router "
+ peer);
if (shouldAdd) {
if (_facade.getKBuckets().add(peer))
_newPeers++;
else
_seenPeers++;
}
_curIndex++;
requeue(0);
}
}
/** the peer gave us a reference to a new router, and we were able to fetch it */
private final class ReplyVerifiedJob extends JobImpl {
private Hash _key;
public ReplyVerifiedJob(RouterContext enclosingContext, Hash key) {
super(enclosingContext);
_key = key;
}
public String getName() { return "Search reply value verified"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64() + " verified: " + _key.toBase64());
_repliesPendingVerification--;
getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0);
}
}
/** the peer gave us a reference to a new router, and we were NOT able to fetch it */
private final class ReplyNotVerifiedJob extends JobImpl {
private Hash _key;
public ReplyNotVerifiedJob(RouterContext enclosingContext, Hash key) {
super(enclosingContext);
_key = key;
}
public String getName() { return "Search reply value NOT verified"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64() + " failed verification: " + _key.toBase64());
_repliesPendingVerification--;
_invalidPeers++;
getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0);
}
}
}
/**
* Called when a particular peer failed to respond before the timeout was
* reached, or if the peer could not be contacted at all.
@ -833,4 +707,144 @@ class SearchJob extends JobImpl {
return super.toString() + " started "
+ DataHelper.formatDuration((getContext().clock().now() - _startedOn)) + " ago";
}
boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); }
long timeoutMs() { return _timeoutMs; }
boolean add(Hash peer) { return _facade.getKBuckets().add(peer); }
}
class SearchReplyJob extends JobImpl {
private DatabaseSearchReplyMessage _msg;
private Log _log;
/**
* Peer who we think sent us the reply. Note: could be spoofed! If the
* attacker knew we were searching for a particular key from a
* particular peer, they could send us some searchReply messages with
* shitty values, trying to get us to consider that peer unreliable.
* Potential fixes include either authenticated 'from' address or use a
* nonce in the search + searchReply (and check for it in the selector).
*
*/
private Hash _peer;
private int _curIndex;
private int _invalidPeers;
private int _seenPeers;
private int _newPeers;
private int _duplicatePeers;
private int _repliesPendingVerification;
private long _duration;
private SearchJob _searchJob;
public SearchReplyJob(RouterContext enclosingContext, SearchJob job, DatabaseSearchReplyMessage message, Hash peer, long duration) {
super(enclosingContext);
_log = enclosingContext.logManager().getLog(getClass());
_searchJob = job;
_msg = message;
_peer = peer;
_curIndex = 0;
_invalidPeers = 0;
_seenPeers = 0;
_newPeers = 0;
_duplicatePeers = 0;
_repliesPendingVerification = 0;
}
public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() {
if (_curIndex >= _msg.getNumReplies()) {
if (_repliesPendingVerification > 0) {
// we received new references from the peer, but still
// haven't verified all of them, so lets give it more time
requeue(_searchJob.timeoutMs());
} else {
// either they didn't tell us anything new or we have verified
// (or failed to verify) all of them. we're done
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
if (_newPeers > 0)
_searchJob.newPeersFound(_newPeers);
}
} else {
Hash peer = _msg.getReply(_curIndex);
boolean shouldAdd = false;
RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer);
if (info == null) {
// if the peer is giving us lots of bad peer references,
// dont try to fetch them.
boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer);
if (!sendsBadInfo) {
// we don't need to search for everthing we're given here - only ones that
// are next in our search path...
if (getContext().shitlist().isShitlisted(peer)) {
if (_log.shouldLog(Log.INFO))
_log.info("Not looking for a shitlisted peer...");
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
} else {
//getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
//_repliesPendingVerification++;
shouldAdd = true;
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
}
}
if (_searchJob.wasAttempted(peer)) {
_duplicatePeers++;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer);
if (shouldAdd) {
if (_searchJob.add(peer))
_newPeers++;
else
_seenPeers++;
}
_curIndex++;
requeue(0);
}
}
void replyVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0);
}
void replyNotVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
_invalidPeers++;
getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0);
}
}
/** the peer gave us a reference to a new router, and we were able to fetch it */
class ReplyVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_replyJob = srj;
_key = key;
}
public String getName() { return "Search reply value verified"; }
public void runJob() { _replyJob.replyVerified(); }
}
/** the peer gave us a reference to a new router, and we were NOT able to fetch it */
class ReplyNotVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyNotVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_key = key;
_replyJob = srj;
}
public String getName() { return "Search reply value NOT verified"; }
public void runJob() { _replyJob.replyNotVerified(); }
}

View File

@ -20,35 +20,29 @@ class PersistProfilesJob extends JobImpl {
public String getName() { return "Persist profiles"; }
public void runJob() {
Set peers = _mgr.selectPeers();
Hash hashes[] = new Hash[peers.size()];
int i = 0;
for (Iterator iter = peers.iterator(); iter.hasNext(); )
hashes[i] = (Hash)iter.next();
getContext().jobQueue().addJob(new PersistProfileJob(getContext(), hashes));
}
private class PersistProfileJob extends JobImpl {
private Hash _peers[];
private int _cur;
public PersistProfileJob(RouterContext enclosingContext, Hash peers[]) {
super(enclosingContext);
_peers = peers;
_cur = 0;
}
public void runJob() {
if (_cur < _peers.length) {
_mgr.storeProfile(_peers[_cur]);
_cur++;
}
if (_cur >= _peers.length) {
// no more left, requeue up the main persist-em-all job
PersistProfilesJob.this.getTiming().setStartAfter(getContext().clock().now() + PERSIST_DELAY);
PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfilesJob.this);
} else {
// we've got peers left to persist, so requeue the persist profile job
PersistProfilesJob.PersistProfileJob.this.requeue(1000);
}
}
public String getName() { return "Persist profile"; }
getContext().jobQueue().addJob(new PersistProfileJob(getContext(), this, peers));
}
void persist(Hash peer) { _mgr.storeProfile(peer); }
void requeue() { requeue(PERSIST_DELAY); }
}
class PersistProfileJob extends JobImpl {
private PersistProfilesJob _job;
private Iterator _peers;
public PersistProfileJob(RouterContext enclosingContext, PersistProfilesJob job, Set peers) {
super(enclosingContext);
_peers = peers.iterator();
_job = job;
}
public void runJob() {
if (_peers.hasNext())
_job.persist((Hash)_peers.next());
if (_peers.hasNext()) {
requeue(1000);
} else {
// no more left, requeue up the main persist-em-all job
_job.requeue();
}
}
public String getName() { return "Persist profile"; }
}

View File

@ -38,6 +38,8 @@ public class PacketBuilder {
_context = ctx;
_transport = transport;
_log = ctx.logManager().getLog(PacketBuilder.class);
_context.statManager().createRateStat("udp.packetAuthTime", "How long it takes to encrypt and MAC a packet for sending", "udp", new long[] { 60*1000 });
_context.statManager().createRateStat("udp.packetAuthTimeSlow", "How long it takes to encrypt and MAC a packet for sending (when its slow)", "udp", new long[] { 60*1000, 10*60*1000 });
}
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer) {
@ -1029,6 +1031,7 @@ public class PacketBuilder {
* @param iv IV to deliver
*/
private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) {
long before = System.currentTimeMillis();
int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE;
int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset();
byte data[] = packet.getPacket().getData();
@ -1059,5 +1062,9 @@ public class PacketBuilder {
System.arraycopy(ba.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
_hmacCache.release(ba);
long timeToAuth = System.currentTimeMillis() - before;
_context.statManager().addRateData("udp.packetAuthTime", timeToAuth, timeToAuth);
if (timeToAuth > 100)
_context.statManager().addRateData("udp.packetAuthTimeSlow", timeToAuth, timeToAuth);
}
}

View File

@ -134,17 +134,26 @@ public class PacketHandler {
+ packet + ": " + _reader);
}
long timeToDequeue = packet.getTimeSinceEnqueue() - packet.getTimeSinceReceived();
long timeToVerify = 0;
long beforeRecv = packet.getTimeSinceReceiveFragments();
if (beforeRecv > 0)
timeToVerify = beforeRecv - packet.getTimeSinceReceived();
long enqueueTime = packet.getEnqueueTime();
long recvTime = packet.getReceivedTime();
long beforeValidateTime = packet.getBeforeValidate();
long afterValidateTime = packet.getAfterValidate();
long timeToDequeue = recvTime - enqueueTime;
long timeToValidate = 0;
long authTime = 0;
if (afterValidateTime > 0) {
timeToValidate = afterValidateTime - enqueueTime;
authTime = afterValidateTime - beforeValidateTime;
}
if (timeToDequeue > 50)
_context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue);
if (timeToVerify > 0) {
_context.statManager().addRateData("udp.packetVerifyTime", timeToVerify, timeToDequeue);
if (timeToVerify > 100)
_context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToVerify, timeToDequeue);
if (authTime > 50)
_context.statManager().addRateData("udp.packetAuthRecvTime", authTime, beforeValidateTime-recvTime);
if (timeToValidate > 0) {
_context.statManager().addRateData("udp.packetVerifyTime", timeToValidate, authTime);
if (timeToValidate > 50)
_context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToValidate, authTime);
}
// back to the cache with thee!

View File

@ -35,12 +35,16 @@ public class PacketPusher implements Runnable {
public void run() {
while (_alive) {
UDPPacket packets[] = _fragments.getNextVolley();
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
try {
UDPPacket packets[] = _fragments.getNextVolley();
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
}
}
} catch (Exception e) {
_log.log(Log.CRIT, "Error pushing", e);
}
}
}

View File

@ -1202,10 +1202,13 @@ public class PeerState {
}
if ( (_retransmitter != null) && ( (_retransmitter.isExpired() || _retransmitter.isComplete()) ) )
OutboundMessageState retrans = _retransmitter;
if ( (retrans != null) && ( (retrans.isExpired() || retrans.isComplete()) ) ) {
_retransmitter = null;
retrans = null;
}
if ( (_retransmitter != null) && (_retransmitter != state) ) {
if ( (retrans != null) && (retrans != state) ) {
// choke it, since there's already another message retransmitting to this
// peer.
_context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted, _packetsTransmitted);

View File

@ -38,6 +38,8 @@ public class UDPPacket {
private volatile Exception _acquiredBy;
private long _enqueueTime;
private long _receivedTime;
private long _beforeValidate;
private long _afterValidate;
private long _beforeReceiveFragments;
private long _afterHandlingTime;
private boolean _isInbound;
@ -84,6 +86,7 @@ public class UDPPacket {
ctx.statManager().createRateStat("udp.packetsLiveOutbound", "Number of live outbound packets in memory", "udp", new long[] { 60*1000, 5*60*1000 });
ctx.statManager().createRateStat("udp.packetsLivePendingRecvInbound", "Number of live inbound packets not yet handled by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 });
ctx.statManager().createRateStat("udp.packetsLivePendingHandleInbound", "Number of live inbound packets not yet handled fully by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 });
ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", new long[] { 60*1000 });
// the data buffer is clobbered on init(..), but we need it to bootstrap
_packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
init(ctx, inbound);
@ -146,10 +149,14 @@ public class UDPPacket {
public RemoteHostId getRemoteHost() {
if (_remoteHost == null) {
long before = System.currentTimeMillis();
InetAddress addr = _packet.getAddress();
byte ip[] = addr.getAddress();
int port = _packet.getPort();
_remoteHost = new RemoteHostId(ip, port);
long timeToFetch = System.currentTimeMillis() - before;
if (timeToFetch > 50)
_context.statManager().addRateData("udp.fetchRemoteSlow", timeToFetch, getLifetime());
}
return _remoteHost;
}
@ -161,6 +168,7 @@ public class UDPPacket {
*/
public boolean validate(SessionKey macKey) {
verifyNotReleased();
_beforeValidate = _context.clock().now();
boolean eq = false;
ByteArray buf = _validateCache.acquire();
@ -202,6 +210,7 @@ public class UDPPacket {
}
_validateCache.release(buf);
_afterValidate = _context.clock().now();
return eq;
}
@ -237,6 +246,15 @@ public class UDPPacket {
/** a packet handler has finished parsing out the good bits */
long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); }
/** when it was added to the endpoint's receive queue */
long getEnqueueTime() { return _enqueueTime; }
/** when it was pulled off the endpoint receive queue */
long getReceivedTime() { return _receivedTime; }
/** when we began validate() */
long getBeforeValidate() { return _beforeValidate; }
/** when we finished validate() */
long getAfterValidate() { return _afterValidate; }
public String toString() {
verifyNotReleased();
StringBuffer buf = new StringBuffer(64);