2005-02-20 jrandom

* Allow the streaming lib resend frequency to drop down to 20s as the
      minimum, so that up to 2 retries can get sent on an http request.
    * Add further limits to failsafe tunnels.
    * Keep exploratory and client tunnel testing and building stats separate.
    * Only use the 60s period for throttling tunnel requests due to transient
      network overload.
    * Rebuild tunnels earlier (1-3m before expiration, by default)
    * Cache the next hop's routerInfo for participating tunnels so that the
      tunnel participation doesn't depend on the netDb.
    * Fixed a long standing bug in the streaming lib where we wouldn't always
      unchoke messages when the window size grows.
    * Make sure the window size never reaches 0 (duh)
This commit is contained in:
jrandom
2005-02-21 18:02:14 +00:00
committed by zzz
parent 0db239a3fe
commit 21f13dba43
19 changed files with 108 additions and 34 deletions

View File

@ -216,7 +216,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
this.out = out;
_toI2P = toI2P;
direction = (toI2P ? "toI2P" : "fromI2P");
_cache = ByteCache.getInstance(256, NETWORK_BUFFER_SIZE);
_cache = ByteCache.getInstance(16, NETWORK_BUFFER_SIZE);
setName("StreamForwarder " + _runnerId + "." + (++__forwarderId));
start();
}

View File

@ -49,9 +49,11 @@ public class ConfigTunnelsHelper {
TunnelPoolSettings in = _context.tunnelManager().getInboundSettings(dest.calculateHash());
TunnelPoolSettings out = _context.tunnelManager().getOutboundSettings(dest.calculateHash());
String name = (in != null ? in.getDestinationNickname() : null);
if ( (in == null) || (out == null) ) continue;
String name = in.getDestinationNickname();
if (name == null)
name = (out != null ? out.getDestinationNickname() : null);
name = out.getDestinationNickname();
if (name == null)
name = dest.calculateHash().toBase64().substring(0,6);

View File

@ -76,7 +76,7 @@ public class Connection {
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 30*1000;
public static final long MIN_RESEND_DELAY = 20*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000;
@ -181,6 +181,11 @@ public class Connection {
}
}
}
void windowAdjusted() {
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
}
void ackImmediately() {
_receiver.send(null, 0, 0);
@ -866,6 +871,7 @@ public class Connection {
+ ") for " + Connection.this.toString());
getOptions().setWindowSize(newWindowSize);
windowAdjusted();
}
}

View File

@ -160,6 +160,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public void setWindowSize(int numMsgs) {
if (numMsgs > _maxWindowSize)
numMsgs = _maxWindowSize;
else if (numMsgs <= 0)
numMsgs = 1;
_windowSize = numMsgs;
}

View File

@ -225,6 +225,9 @@ public class ConnectionPacketHandler {
}
}
if (newWindowSize <= 0)
newWindowSize = 1;
if (_log.shouldLog(Log.DEBUG))
_log.debug("New window size " + newWindowSize + "/" + oldWindow + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
@ -233,6 +236,7 @@ public class ConnectionPacketHandler {
con.setCongestionWindowEnd(newWindowSize + lowest);
}
con.windowAdjusted();
return congested;
}

View File

@ -38,7 +38,7 @@ class SchedulerConnectedBulk extends SchedulerImpl {
public boolean accept(Connection con) {
boolean ok = (con != null) &&
(con.getAckedPackets() > 0) &&
(con.getHighestAckedThrough() >= 0) &&
(con.getOptions().getProfile() == ConnectionOptions.PROFILE_BULK) &&
(!con.getResetReceived()) &&
( (con.getCloseSentOn() <= 0) || (con.getCloseReceivedOn() <= 0) );

View File

@ -39,7 +39,7 @@ class SchedulerConnecting extends SchedulerImpl {
boolean notYetConnected = (con.getIsConnected()) &&
//(con.getSendStreamId() == null) && // not null on recv
(con.getLastSendId() >= 0) &&
(con.getAckedPackets() <= 0) &&
(con.getHighestAckedThrough() < 0) &&
(!con.getResetReceived());
return notYetConnected;
}

View File

@ -24,11 +24,11 @@ class RouterThrottleImpl implements RouterThrottle {
private static int JOB_LAG_LIMIT = 2000;
/**
* Arbitrary hard limit - if we throttle our network connection this many
* times in the previous 10-20 minute period, don't accept requests to
* times in the previous 2 minute period, don't accept requests to
* participate in tunnels.
*
*/
private static int THROTTLE_EVENT_LIMIT = 300;
private static int THROTTLE_EVENT_LIMIT = 30;
private static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels";
private static final String PROP_DEFAULT_KBPS_THROTTLE = "router.defaultKBpsThrottle";
@ -81,7 +81,7 @@ class RouterThrottleImpl implements RouterThrottle {
RateStat rs = _context.statManager().getRate("router.throttleNetworkCause");
Rate r = null;
if (rs != null)
r = rs.getRate(10*60*1000);
r = rs.getRate(60*1000);
long throttleEvents = (r != null ? r.getCurrentEventCount() + r.getLastEventCount() : 0);
if (throttleEvents > THROTTLE_EVENT_LIMIT) {
if (_log.shouldLog(Log.DEBUG))

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.143 $ $Date: 2005/02/19 18:20:57 $";
public final static String ID = "$Revision: 1.144 $ $Date: 2005/02/20 04:12:46 $";
public final static String VERSION = "0.5";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -223,7 +223,7 @@ public class ClientManager {
}
}
private static final int REQUEST_LEASESET_TIMEOUT = 20*1000;
private static final int REQUEST_LEASESET_TIMEOUT = 120*1000;
public void requestLeaseSet(Hash dest, LeaseSet ls) {
ClientConnectionRunner runner = getRunner(dest);
if (runner != null) {

View File

@ -41,6 +41,9 @@ class RequestLeaseSetJob extends JobImpl {
_expiration = expiration;
_onCreate = onCreate;
_onFail = onFail;
ctx.statManager().createRateStat("client.requestLeaseSetSuccess", "How frequently the router requests successfully a new leaseSet?", "ClientMessages", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("client.requestLeaseSetTimeout", "How frequently the router requests a new leaseSet but gets no reply?", "ClientMessages", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("client.requestLeaseSetDropped", "How frequently the router requests a new leaseSet but the client drops?", "ClientMessages", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public String getName() { return "Request Lease Set"; }
@ -80,6 +83,7 @@ class RequestLeaseSetJob extends JobImpl {
getContext().jobQueue().addJob(new CheckLeaseRequestStatus(getContext(), state));
return;
} catch (I2CPMessageException ime) {
getContext().statManager().addRateData("client.requestLeaseSetDropped", 1, 0);
_log.error("Error sending I2CP message requesting the lease set", ime);
state.setIsSuccessful(false);
_runner.setLeaseRequest(null);
@ -107,9 +111,13 @@ class RequestLeaseSetJob extends JobImpl {
if (_runner.isDead()) return;
if (_req.getIsSuccessful()) {
// we didn't fail
RequestLeaseSetJob.CheckLeaseRequestStatus.this.getContext().statManager().addRateData("client.requestLeaseSetSuccess", 1, 0);
return;
} else {
_log.error("Failed to receive a leaseSet in the time allotted (" + new Date(_req.getExpiration()) + ")");
RequestLeaseSetJob.CheckLeaseRequestStatus.this.getContext().statManager().addRateData("client.requestLeaseSetTimeout", 1, 0);
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Failed to receive a leaseSet in the time allotted (" + new Date(_req.getExpiration()) + ") for "
+ _runner.getConfig().getDestination().calculateHash().toBase64());
_runner.disconnectClient("Took too long to request leaseSet");
if (_req.getOnFailed() != null)
RequestLeaseSetJob.this.getContext().jobQueue().addJob(_req.getOnFailed());

View File

@ -37,14 +37,15 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
public String getName() { return "Update Reply Found for Kademlia Search"; }
public void runJob() {
I2NPMessage message = _message;
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Reply from " + _peer.toBase64()
+ " with message " + _message.getClass().getName());
+ " with message " + message.getClass().getName());
if (_message instanceof DatabaseStoreMessage) {
if (message instanceof DatabaseStoreMessage) {
long timeToReply = _state.dataFound(_peer);
DatabaseStoreMessage msg = (DatabaseStoreMessage)_message;
DatabaseStoreMessage msg = (DatabaseStoreMessage)message;
if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
try {
_facade.store(msg.getKey(), msg.getLeaseSet());
@ -71,11 +72,11 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Unknown db store type?!@ " + msg.getValueType());
}
} else if (_message instanceof DatabaseSearchReplyMessage) {
_job.replyFound((DatabaseSearchReplyMessage)_message, _peer);
} else if (message instanceof DatabaseSearchReplyMessage) {
_job.replyFound((DatabaseSearchReplyMessage)message, _peer);
} else {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": WTF, reply job matched a strange message: " + _message);
_log.error(getJobId() + ": WTF, reply job matched a strange message: " + message);
return;
}

View File

@ -363,8 +363,8 @@ public class FragmentHandler {
}
if (removed && !_msg.getReleased()) {
noteFailure(_msg.getMessageId());
if (_log.shouldLog(Log.ERROR))
_log.error("Dropped failed fragmented message: " + _msg);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropped failed fragmented message: " + _msg);
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
_msg.failed();
} else {

View File

@ -27,6 +27,7 @@ public class TunnelParticipant {
private InboundEndpointProcessor _inboundEndpointProcessor;
private InboundMessageDistributor _inboundDistributor;
private FragmentHandler _handler;
private RouterInfo _nextHopCache;
public TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor) {
this(ctx, config, processor, null);
@ -44,6 +45,10 @@ public class TunnelParticipant {
_inboundEndpointProcessor = inEndProc;
if (inEndProc != null)
_inboundDistributor = new InboundMessageDistributor(ctx, inEndProc.getDestination());
if ( (_config != null) && (_config.getSendTo() != null) ) {
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
}
}
public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
@ -62,7 +67,9 @@ public class TunnelParticipant {
if ( (_config != null) && (_config.getSendTo() != null) ) {
_config.incrementProcessedMessages();
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
RouterInfo ri = _nextHopCache;
if (ri == null)
ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (ri != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send off to nextHop directly (" + _config.getSendTo().toBase64().substring(0,4)
@ -115,6 +122,7 @@ public class TunnelParticipant {
public void runJob() {
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (ri != null) {
_nextHopCache = ri;
send(_config, _msg, ri);
} else {
if (_log.shouldLog(Log.ERROR))
@ -134,6 +142,7 @@ public class TunnelParticipant {
public void runJob() {
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (ri != null) {
_nextHopCache = ri;
if (_log.shouldLog(Log.ERROR))
_log.error("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4)
+ " failed, but we found it!! where do we go for " + _config + "? msg dropped: " + _msg);

View File

@ -20,7 +20,7 @@ class RebuildJob extends JobImpl {
_cfg = cfg;
_buildToken = buildToken;
long rebuildOn = cfg.getExpiration() - pool.getSettings().getRebuildPeriod();
rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod());
rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod()*2);
getTiming().setStartAfter(rebuildOn);
}
public String getName() { return "Rebuild tunnel"; }

View File

@ -43,11 +43,12 @@ public class RequestTunnelJob extends JobImpl {
private TunnelCreatorConfig _config;
private long _lastSendTime;
private boolean _isFake;
private boolean _isExploratory;
static final int HOP_REQUEST_TIMEOUT = 30*1000;
private static final int LOOKUP_TIMEOUT = 10*1000;
public RequestTunnelJob(RouterContext ctx, TunnelCreatorConfig cfg, Job onCreated, Job onFailed, int hop, boolean isFake) {
public RequestTunnelJob(RouterContext ctx, TunnelCreatorConfig cfg, Job onCreated, Job onFailed, int hop, boolean isFake, boolean isExploratory) {
super(ctx);
_log = ctx.logManager().getLog(RequestTunnelJob.class);
_config = cfg;
@ -58,13 +59,16 @@ public class RequestTunnelJob extends JobImpl {
_lookups = 0;
_lastSendTime = 0;
_isFake = isFake;
_isExploratory = isExploratory;
ctx.statManager().createRateStat("tunnel.receiveRejectionProbabalistic", "How often we are rejected probabalistically?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.receiveRejectionTransient", "How often we are rejected due to transient overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.receiveRejectionBandwidth", "How often we are rejected due to bandwidth overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.receiveRejectionCritical", "How often we are rejected due to critical failure?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildFailure", "How often we fail to build a tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildSuccess", "How often we succeed building a tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildFailure", "How often we fail to build a non-exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratoryFailure", "How often we fail to build an exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildSuccess", "How often we succeed building a non-exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratorySuccess", "How often we succeed building an exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting hop " + hop + " in " + cfg);
@ -108,7 +112,7 @@ public class RequestTunnelJob extends JobImpl {
+ _currentConfig.getReceiveTunnel() + ": " + _config);
// inbound tunnel with more than just ourselves
RequestTunnelJob req = new RequestTunnelJob(getContext(), _config, _onCreated,
_onFailed, _currentHop - 1, _isFake);
_onFailed, _currentHop - 1, _isFake, _isExploratory);
if (_isFake)
req.runJob();
else
@ -257,19 +261,25 @@ public class RequestTunnelJob extends JobImpl {
_log.info("tunnel building failed: " + _config + " at hop " + _currentHop);
if (_onFailed != null)
getContext().jobQueue().addJob(_onFailed);
getContext().statManager().addRateData("tunnel.buildFailure", 1, 0);
if (_isExploratory)
getContext().statManager().addRateData("tunnel.buildExploratoryFailure", 1, 0);
else
getContext().statManager().addRateData("tunnel.buildFailure", 1, 0);
}
private void peerSuccess() {
getContext().profileManager().tunnelJoined(_currentPeer.getIdentity().calculateHash(),
getContext().clock().now() - _lastSendTime);
if (_currentHop > 0) {
RequestTunnelJob j = new RequestTunnelJob(getContext(), _config, _onCreated, _onFailed, _currentHop - 1, _isFake);
RequestTunnelJob j = new RequestTunnelJob(getContext(), _config, _onCreated, _onFailed, _currentHop - 1, _isFake, _isExploratory);
getContext().jobQueue().addJob(j);
} else {
if (_onCreated != null)
getContext().jobQueue().addJob(_onCreated);
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
if (_isExploratory)
getContext().statManager().addRateData("tunnel.buildExploratorySuccess", 1, 0);
else
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
}
}

View File

@ -40,6 +40,8 @@ class TestJob extends JobImpl {
getTiming().setStartAfter(getDelay() + ctx.clock().now());
ctx.statManager().createRateStat("tunnel.testFailedTime", "How long did the failure take (max of 60s for full timeout)?", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.testExploratoryFailedTime", "How long did the failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.testSuccessLength", "How long were the tunnels that passed the test?", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long did tunnel testing take?", "Tunnels",
@ -132,7 +134,10 @@ class TestJob extends JobImpl {
}
private void testFailed(long timeToFail) {
getContext().statManager().addRateData("tunnel.testFailedTime", timeToFail, timeToFail);
if (_pool.getSettings().isExploratory())
getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail);
else
getContext().statManager().addRateData("tunnel.testFailedTime", timeToFail, timeToFail);
_cfg.tunnelFailed();
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel test failed in " + timeToFail + "ms: " + _cfg);

View File

@ -49,7 +49,7 @@ public class TunnelBuilder {
// queue up a job to request the endpoint to join the tunnel, which then
// requeues up another job for earlier hops, etc, until it reaches the
// gateway. after the gateway is confirmed, onCreated is fired
RequestTunnelJob req = new RequestTunnelJob(ctx, cfg, onCreated, onFailed, cfg.getLength()-1, fake);
RequestTunnelJob req = new RequestTunnelJob(ctx, cfg, onCreated, onFailed, cfg.getLength()-1, fake, pool.getSettings().isExploratory());
if (fake) // lets get it done inline, as we /need/ it asap
req.runJob();
else

View File

@ -137,6 +137,33 @@ public class TunnelPool {
if (_log.shouldLog(Log.INFO))
_log.info(toString() + ": keepBuilding does NOT want building to continue (want "
+ wanted + ", have " + remaining);
} else {
boolean needed = true;
int valid = 0;
synchronized (_tunnels) {
if (_tunnels.size() > wanted) {
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now() + 3*_settings.getRebuildPeriod()) {
valid++;
if (valid >= wanted*2)
break;
}
}
if (valid >= wanted*2)
needed = false;
}
}
if (!needed) {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": keepBuilding wants building to continue, but not "
+ " with the current object... # tunnels = " + valid + ", wanted = " + wanted);
synchronized (_tokens) {
_tokens.remove(token);
}
return false;
}
}
return rv;
}
@ -279,7 +306,7 @@ public class TunnelPool {
public void tunnelFailed(PooledTunnelCreatorConfig cfg) {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": Tunnel failed: " + cfg, new Exception("failure cause"));
_log.warn(toString() + ": Tunnel failed: " + cfg);
int remaining = 0;
LeaseSet ls = null;
synchronized (_tunnels) {
@ -337,7 +364,7 @@ public class TunnelPool {
int valid = 0;
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now()) {
if (info.getExpiration() > _context.clock().now() + 3*_settings.getRebuildPeriod()) {
valid++;
if (valid >= quantity)
break;