* fix up the propogation of client options to the streaming lib

* add new back-off logic to reduce payload resends during transient
  lag - only let one packet be resent at a time, even if the window size
  allows it (and the packet timers request it).  this should make
  congestion less painful, and reduce the overall number of messages
  resent (as the SACKs for the one packet actively resent should clarify
  what made it through)
This commit is contained in:
jrandom
2004-11-16 22:15:16 +00:00
committed by zzz
parent d943b4993a
commit f46d0a720c
3 changed files with 72 additions and 2 deletions

View File

@ -61,6 +61,8 @@ public class Connection {
private boolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private Object _connectLock;
/** how many messages have been resent and not yet ACKed? */
private int _activeResends;
public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000;
@ -103,6 +105,7 @@ public class Connection {
_activityTimer = new ActivityTimer();
_ackSinceCongestion = true;
_connectLock = new Object();
_activeResends = 0;
}
public long getNextOutboundPacketNum() {
@ -283,8 +286,18 @@ public class Connection {
PacketLocal p = (PacketLocal)acked.get(i);
_outboundPackets.remove(new Long(p.getSequenceNum()));
_ackedPackets++;
if (p.getNumSends() > 1) {
_activeResends--;
if (_log.shouldLog(Log.WARN))
_log.warn("Active resend of " + p + " successful, # active left: " + _activeResends);
}
}
}
if ( (_outboundPackets.size() <= 0) && (_activeResends != 0) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("All outbound packets acked, clearing " + _activeResends);
_activeResends = 0;
}
_outboundPackets.notifyAll();
}
if ((acked != null) && (acked.size() > 0) )
@ -652,8 +665,10 @@ public class Connection {
*/
private class ResendPacketEvent implements SimpleTimer.TimedEvent {
private PacketLocal _packet;
private boolean _currentIsActiveResend;
public ResendPacketEvent(PacketLocal packet) {
_packet = packet;
_currentIsActiveResend = true;
}
public void timeReached() {
@ -667,6 +682,16 @@ public class Connection {
resend = true;
}
if ( (resend) && (_packet.getAckTime() < 0) ) {
if ( (_activeResends > 0) && (!_currentIsActiveResend) ) {
// we want to resend this packet, but there are already active
// resends in the air and we dont want to make a bad situation
// worse. wait another second
if (_log.shouldLog(Log.WARN))
_log.warn("Delaying resend of " + _packet + " as there are "
+ _activeResends + " active resends already in play");
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, 1000);
return;
}
// revamp various fields, in case we need to ack more, etc
_inputStream.updateAcks(_packet);
_packet.setOptionalDelay(getOptions().getChoke());
@ -685,13 +710,21 @@ public class Connection {
int numSends = _packet.getNumSends() + 1;
if (numSends == 2) {
// first resend for this packet
_activeResends++;
_currentIsActiveResend = true;
}
// in case things really suck, the other side may have lost thier
// session tags (e.g. they restarted), so jump back to ElGamal.
if ( (newWindowSize == 1) && (numSends > 2) )
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
if (_log.shouldLog(Log.WARN))
_log.warn("Resend packet " + _packet + " time " + numSends + " (wsize "
_log.warn("Resend packet " + _packet + " time " + numSends +
" activeResends: " + _activeResends +
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);

View File

@ -6,7 +6,7 @@ import java.util.Properties;
* Define the current options for the con (and allow custom tweaking midstream)
*
*/
public class ConnectionOptions extends I2PSocketOptions {
public class ConnectionOptions extends I2PSocketOptionsImpl {
private int _connectDelay;
private boolean _fullySigned;
private int _windowSize;
@ -203,4 +203,31 @@ public class ConnectionOptions extends I2PSocketOptions {
*/
public int getInboundBufferSize() { return _inboundBufferSize; }
public void setInboundBufferSize(int bytes) { _inboundBufferSize = bytes; }
public String toString() {
StringBuffer buf = new StringBuffer(128);
buf.append("conDelay=").append(_connectDelay);
buf.append(" maxSize=").append(_maxMessageSize);
buf.append(" rtt=").append(_rtt);
buf.append(" rwin=").append(_receiveWindow);
buf.append(" resendDelay=").append(_resendDelay);
buf.append(" ackDelay=").append(_sendAckDelay);
buf.append(" cwin=").append(_windowSize);
buf.append(" maxResends=").append(_maxResends);
buf.append(" writeTimeout=").append(getWriteTimeout());
buf.append(" inactivityTimeout=").append(_inactivityTimeout);
buf.append(" inboundBuffer=").append(_inboundBufferSize);
return buf.toString();
}
public static void main(String args[]) {
Properties p = new Properties();
p.setProperty(PROP_CONNECT_DELAY, "1000");
ConnectionOptions c = new ConnectionOptions(p);
System.out.println("opts: " + c);
c = new ConnectionOptions(new I2PSocketOptionsImpl(p));
System.out.println("opts: " + c);
}
}

View File

@ -80,8 +80,18 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_defaultOptions = new ConnectionOptions(opts);
_serverSocket = new I2PServerSocketFull(this);
if (_log.shouldLog(Log.INFO)) {
_log.info("Socket manager created. \ndefault options: " + _defaultOptions
+ "\noriginal properties: " + opts);
}
}
public I2PSocketOptions buildOptions() { return buildOptions(null); }
public I2PSocketOptions buildOptions(Properties opts) {
return new ConnectionOptions(opts);
}
public I2PSession getSession() {
return _session;
}