2004-12-28 jrandom

* Cleaned up the resending and choking algorithm in the streaming lib.
    * Removed the read timeout override for I2PTunnel's httpclient, allowing
      it to use the default for the streaming lib.
    * Revised ack triggers in the streaming lib.
    * Logging.
This commit is contained in:
jrandom
2004-12-29 15:53:28 +00:00
committed by zzz
parent 484b528d4f
commit 1503ee2dfa
11 changed files with 81 additions and 37 deletions

View File

@ -147,7 +147,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
private static final int DEFAULT_READ_TIMEOUT = 60*1000;
/**
* create the default options (using the default timeout, etc)
*
@ -156,8 +155,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
Properties defaultOpts = getTunnel().getClientOptions();
if (!defaultOpts.contains(I2PSocketOptions.PROP_READ_TIMEOUT))
defaultOpts.setProperty(I2PSocketOptions.PROP_READ_TIMEOUT, ""+DEFAULT_READ_TIMEOUT);
if (!defaultOpts.contains("i2p.streaming.inactivityTimeout"))
defaultOpts.setProperty("i2p.streaming.inactivityTimeout", ""+DEFAULT_READ_TIMEOUT);
//if (!defaultOpts.contains("i2p.streaming.inactivityTimeout"))
// defaultOpts.setProperty("i2p.streaming.inactivityTimeout", ""+DEFAULT_READ_TIMEOUT);
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);

View File

@ -72,7 +72,7 @@ public class Connection {
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 40*1000;
public static final long MIN_RESEND_DELAY = 30*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000;
@ -146,20 +146,25 @@ public class Connection {
synchronized (_outboundPackets) {
if (!started)
_context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size(), timeoutMs);
if (!_connected)
return false;
started = true;
if (_outboundPackets.size() >= _options.getWindowSize()) {
if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ) {
if (writeExpire > 0) {
if (timeLeft <= 0) {
_log.error("Outbound window is full of " + _outboundPackets.size()
+ " with " + _activeResends + " active resends"
+ " and we've waited too long (" + writeExpire + "ms)");
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "), waiting " + timeLeft);
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/"
+ _activeResends + "), waiting " + timeLeft);
try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "), waiting indefinitely");
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
+ "), waiting indefinitely");
try { _outboundPackets.wait(); } catch (InterruptedException ie) {}
}
} else {
@ -242,7 +247,7 @@ public class Connection {
if (timeout > MAX_RESEND_DELAY)
timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend in " + timeout + " for " + packet);
_log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), timeout);
}
@ -688,6 +693,14 @@ public class Connection {
default:
if (_log.shouldLog(Log.WARN))
_log.warn("Closing connection due to inactivity");
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(128);
buf.append("last sent was: ").append(_context.clock().now() - _lastSendTime);
buf.append("ms ago, last received was: ").append(_context.clock().now()-_lastReceivedOn);
buf.append("ms ago, inactivity timeout is: ").append(_options.getInactivityTimeout());
_log.debug(buf.toString());
}
disconnect(true);
break;
}
@ -752,10 +765,8 @@ public class Connection {
*/
private class ResendPacketEvent implements SimpleTimer.TimedEvent {
private PacketLocal _packet;
private boolean _currentIsActiveResend;
public ResendPacketEvent(PacketLocal packet) {
_packet = packet;
_currentIsActiveResend = false;
packet.setResendPacketEvent(ResendPacketEvent.this);
}
@ -763,7 +774,7 @@ public class Connection {
if (_packet.getAckTime() > 0)
return;
if (!_connected) {
if (_resetSent || _resetReceived) {
_packet.cancelled();
return;
}
@ -771,12 +782,15 @@ public class Connection {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Resend period reached for " + _packet);
boolean resend = false;
boolean isLowest = false;
synchronized (_outboundPackets) {
if (_packet.getSequenceNum() == _highestAckedThrough + 1)
isLowest = true;
if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum())))
resend = true;
}
if ( (resend) && (_packet.getAckTime() < 0) ) {
if ( (_activeResends > 0) && (!_currentIsActiveResend) ) {
if (!isLowest) {
// we want to resend this packet, but there are already active
// resends in the air and we dont want to make a bad situation
// worse. wait another second
@ -808,7 +822,6 @@ public class Connection {
if (numSends == 2) {
// first resend for this packet
_activeResends++;
_currentIsActiveResend = true;
}
// in case things really suck, the other side may have lost thier
@ -828,6 +841,17 @@ public class Connection {
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);
_lastSendTime = _context.clock().now();
// acked during resending (... or somethin')
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--;
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
return;
}
if (numSends > _options.getMaxResends()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Too many resends");

View File

@ -114,9 +114,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
con.sendPacket(packet);
long sent = System.currentTimeMillis();
if ( (built-before > 1000) && (_log.shouldLog(Log.WARN)) )
if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet);
if ( (sent-built> 1000) && (_log.shouldLog(Log.WARN)) )
if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet);
return packet;
}
@ -165,7 +165,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
// packets sent, otherwise the other side could receive the CLOSE prematurely,
// since this ACK could arrive before the unacked payload message.
if (con.getOutputStream().getClosed() &&
( (size > 0) || (con.getUnackedPacketsSent() <= 0) ) ) {
( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
packet.setFlag(Packet.FLAG_CLOSE);
con.setCloseSentOn(_context.clock().now());
if (_log.shouldLog(Log.DEBUG))

View File

@ -56,7 +56,7 @@ public class ConnectionPacketHandler {
long ready = con.getInputStream().getHighestReadyBockId();
int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize();
int allowedBlocks = available/con.getOptions().getMaxMessageSize();
if (packet.getSequenceNum() > ready + allowedBlocks) {
if ( (packet.getPayloadSize() > 0) && (packet.getSequenceNum() > ready + allowedBlocks) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Inbound buffer exceeded on connection " + con + " ("
+ ready + "/"+ (ready+allowedBlocks) + "/" + available
@ -106,14 +106,11 @@ public class ConnectionPacketHandler {
con.incrementDupMessagesReceived(1);
// take note of congestion
//con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
if (_log.shouldLog(Log.WARN))
_log.warn("congestion.. dup " + packet);
SimpleTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay());
//con.incrementUnackedPacketsReceived();
//con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
fastAck = true;
//fastAck = true;
} else {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
//con.incrementUnackedPacketsReceived();
@ -128,7 +125,7 @@ public class ConnectionPacketHandler {
fastAck = fastAck || ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew);
con.eventOccurred();
if (fastAck) {
if (con.getLastSendTime() + 1000 < _context.clock().now()) {
if (con.getLastSendTime() + 2000 < _context.clock().now()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast ack for dup " + packet);
con.ackImmediately();

View File

@ -69,6 +69,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public void prepare() {
if (_connection != null)
_connection.getInputStream().updateAcks(this);
if (_numSends > 0) // so we can debug to differentiate resends
setOptionalDelay(_numSends * 1000);
}
public long getCreatedOn() { return _createdOn; }
@ -110,10 +112,14 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public String toString() {
String str = super.toString();
if ( (_tagsSent != null) && (_tagsSent.size() > 0) )
str = str + " with tags";
if (_ackOn > 0)
return str + " ack after " + getAckTime();
return str + " ack after " + getAckTime() + (_numSends <= 1 ? "" : " sent " + _numSends + " times");
else
return str;
return str + (_numSends <= 1 ? "" : " sent " + _numSends + " times");
}
public void waitForAccept(int maxWaitMs) {

View File

@ -74,6 +74,9 @@ class PacketQueue {
if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("took " + writeTime + "ms to write the packet: " + packet);
// last chance to short circuit...
if (packet.getAckTime() > 0) return;
// this should not block!
begin = _context.clock().now();
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent);

View File

@ -105,6 +105,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
}
return compressed;
}
private static final int NUM_TAGS = 50;
private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
throws I2PSessionException {
@ -119,14 +121,14 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if ( (tagsSent == null) || (tagsSent.size() <= 0) ) {
if (oldTags < 10) {
sentTags = createNewTags(50);
sentTags = createNewTags(NUM_TAGS);
if (_log.shouldLog(Log.DEBUG))
_log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding 50");
_log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding " + NUM_TAGS);
} else if (availTimeLeft < 2 * 60 * 1000) {
// if we have > 10 tags, but they expire in under 2 minutes, we want more
sentTags = createNewTags(50);
sentTags = createNewTags(NUM_TAGS);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding 50 new ones");
_log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding " + NUM_TAGS + " new ones");
//_log.error("** sendBestEffort available time left " + availTimeLeft);
} else {
if (_log.shouldLog(Log.DEBUG))
@ -240,11 +242,11 @@ class I2PSessionImpl2 extends I2PSessionImpl {
SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
Set sentTags = null;
if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) {
sentTags = createNewTags(50);
sentTags = createNewTags(NUM_TAGS);
} else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 2 * 60 * 1000) {
// if we have > 10 tags, but they expire in under 30 seconds, we want more
sentTags = createNewTags(50);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones");
sentTags = createNewTags(NUM_TAGS);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding " + NUM_TAGS + " new ones");
}
SessionKey newKey = null;
if (false) // rekey

View File

@ -1,4 +1,11 @@
$Id: history.txt,v 1.117 2004/12/21 11:32:50 jrandom Exp $
$Id: history.txt,v 1.118 2004/12/21 13:23:03 jrandom Exp $
2004-12-28 jrandom
* Cleaned up the resending and choking algorithm in the streaming lib.
* Removed the read timeout override for I2PTunnel's httpclient, allowing
it to use the default for the streaming lib.
* Revised ack triggers in the streaming lib.
* Logging.
* 2004-12-21 0.4.2.5 released

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.122 $ $Date: 2004/12/21 11:32:50 $";
public final static String ID = "$Revision: 1.123 $ $Date: 2004/12/21 13:23:03 $";
public final static String VERSION = "0.4.2.5";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -168,10 +168,16 @@ public class TCPConnection {
msg.timestamp("TCPConnection.addMessage");
List expired = null;
int remaining = 0;
long remainingSize = 0;
synchronized (_pendingMessages) {
_pendingMessages.add(msg);
expired = locked_expireOld();
locked_throttle();
for (int i = 0; i < _pendingMessages.size(); i++) {
OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i);
remaining++;
remainingSize += cur.getMessageSize();
}
remaining = _pendingMessages.size();
_pendingMessages.notifyAll();
}
@ -182,8 +188,8 @@ public class TCPConnection {
if (_log.shouldLog(Log.WARN))
_log.warn("Message " + cur.getMessageId() + " expired on the queue to "
+ _ident.getHash().toBase64().substring(0,6)
+ " (queue size " + remaining + ") with lifetime "
+ cur.getLifetime());
+ " (queue size " + remaining + "/" + remainingSize + ") with lifetime "
+ cur.getLifetime() + " and size " + cur.getMessageSize());
sent(cur, false, 0);
}
}

View File

@ -559,7 +559,7 @@ class TunnelPool {
}
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel " + id + " marked as not ready, since it /failed/", new Exception("Failed tunnel"));
_log.warn("Tunnel " + id + " marked as not ready, since it /failed/: " + info.toString(), new Exception("Failed tunnel"));
_context.messageHistory().tunnelFailed(info.getTunnelId());
info.setIsReady(false);
Hash us = _context.routerHash();