diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index d427d4c23..01b625bdf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -61,6 +61,8 @@ public class Connection { private boolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ private Object _connectLock; + /** how many messages have been resent and not yet ACKed? */ + private int _activeResends; public static final long MAX_RESEND_DELAY = 60*1000; public static final long MIN_RESEND_DELAY = 20*1000; @@ -103,6 +105,7 @@ public class Connection { _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; _connectLock = new Object(); + _activeResends = 0; } public long getNextOutboundPacketNum() { @@ -283,8 +286,18 @@ public class Connection { PacketLocal p = (PacketLocal)acked.get(i); _outboundPackets.remove(new Long(p.getSequenceNum())); _ackedPackets++; + if (p.getNumSends() > 1) { + _activeResends--; + if (_log.shouldLog(Log.WARN)) + _log.warn("Active resend of " + p + " successful, # active left: " + _activeResends); + } } } + if ( (_outboundPackets.size() <= 0) && (_activeResends != 0) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn("All outbound packets acked, clearing " + _activeResends); + _activeResends = 0; + } _outboundPackets.notifyAll(); } if ((acked != null) && (acked.size() > 0) ) @@ -652,8 +665,10 @@ public class Connection { */ private class ResendPacketEvent implements SimpleTimer.TimedEvent { private PacketLocal _packet; + private boolean _currentIsActiveResend; public ResendPacketEvent(PacketLocal packet) { _packet = packet; + _currentIsActiveResend = true; } public void timeReached() { @@ -667,6 +682,16 @@ public class Connection { resend = true; } if ( (resend) && (_packet.getAckTime() < 0) ) { + if ( (_activeResends > 0) && (!_currentIsActiveResend) ) { + // we want to resend this packet, but there are already active + // resends in the air and we dont want to make a bad situation + // worse. wait another second + if (_log.shouldLog(Log.WARN)) + _log.warn("Delaying resend of " + _packet + " as there are " + + _activeResends + " active resends already in play"); + SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, 1000); + return; + } // revamp various fields, in case we need to ack more, etc _inputStream.updateAcks(_packet); _packet.setOptionalDelay(getOptions().getChoke()); @@ -685,13 +710,21 @@ public class Connection { int numSends = _packet.getNumSends() + 1; + if (numSends == 2) { + // first resend for this packet + _activeResends++; + _currentIsActiveResend = true; + } + // in case things really suck, the other side may have lost thier // session tags (e.g. they restarted), so jump back to ElGamal. if ( (newWindowSize == 1) && (numSends > 2) ) _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); if (_log.shouldLog(Log.WARN)) - _log.warn("Resend packet " + _packet + " time " + numSends + " (wsize " + _log.warn("Resend packet " + _packet + " time " + numSends + + " activeResends: " + _activeResends + + " (wsize " + newWindowSize + " lifetime " + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); _outboundQueue.enqueue(_packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 2ac2ce215..fd928b691 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -6,7 +6,7 @@ import java.util.Properties; * Define the current options for the con (and allow custom tweaking midstream) * */ -public class ConnectionOptions extends I2PSocketOptions { +public class ConnectionOptions extends I2PSocketOptionsImpl { private int _connectDelay; private boolean _fullySigned; private int _windowSize; @@ -203,4 +203,31 @@ public class ConnectionOptions extends I2PSocketOptions { */ public int getInboundBufferSize() { return _inboundBufferSize; } public void setInboundBufferSize(int bytes) { _inboundBufferSize = bytes; } + + public String toString() { + StringBuffer buf = new StringBuffer(128); + buf.append("conDelay=").append(_connectDelay); + buf.append(" maxSize=").append(_maxMessageSize); + buf.append(" rtt=").append(_rtt); + buf.append(" rwin=").append(_receiveWindow); + buf.append(" resendDelay=").append(_resendDelay); + buf.append(" ackDelay=").append(_sendAckDelay); + buf.append(" cwin=").append(_windowSize); + buf.append(" maxResends=").append(_maxResends); + buf.append(" writeTimeout=").append(getWriteTimeout()); + buf.append(" inactivityTimeout=").append(_inactivityTimeout); + buf.append(" inboundBuffer=").append(_inboundBufferSize); + return buf.toString(); + } + + public static void main(String args[]) { + Properties p = new Properties(); + + p.setProperty(PROP_CONNECT_DELAY, "1000"); + ConnectionOptions c = new ConnectionOptions(p); + System.out.println("opts: " + c); + + c = new ConnectionOptions(new I2PSocketOptionsImpl(p)); + System.out.println("opts: " + c); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 37c23654b..f8c97b9e4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -80,8 +80,18 @@ public class I2PSocketManagerFull implements I2PSocketManager { _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _defaultOptions = new ConnectionOptions(opts); _serverSocket = new I2PServerSocketFull(this); + + if (_log.shouldLog(Log.INFO)) { + _log.info("Socket manager created. \ndefault options: " + _defaultOptions + + "\noriginal properties: " + opts); + } } + public I2PSocketOptions buildOptions() { return buildOptions(null); } + public I2PSocketOptions buildOptions(Properties opts) { + return new ConnectionOptions(opts); + } + public I2PSession getSession() { return _session; }