2005-01-17 jrandom

* Added meaningful support for adjusting the preferred message size in the
      streaming lib by setting the i2p.streaming.maxMessageSize=32768 (or
      whatever).  The other side will mimic a reduction (but never an increase).
    * Always make sure to use distinct ConnectionOption objects for each
      connection (duh)
    * Reduced the default ACK delay to 500ms on in the streaming lib
    * Only shrink the streaming window once per window
    * Don't bundle a new jetty.xml with updates
    * Catch another local routerInfo corruption issue on startup.
This commit is contained in:
jrandom
2005-01-17 08:15:00 +00:00
committed by zzz
parent ccb1f491c7
commit 61f217c610
16 changed files with 161 additions and 35 deletions

View File

@ -61,6 +61,8 @@ public class Connection {
private ActivityTimer _activityTimer;
/** window size when we last saw congestion */
private int _lastCongestionSeenAt;
private long _lastCongestionTime;
private long _lastCongestionHighestUnacked;
private boolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private Object _connectLock;
@ -89,7 +91,7 @@ public class Connection {
_log = ctx.logManager().getLog(Connection.class);
_receiver = new ConnectionDataReceiver(ctx, this);
_inputStream = new MessageInputStream(ctx);
_outputStream = new MessageOutputStream(ctx, _receiver);
_outputStream = new MessageOutputStream(ctx, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_chooser = chooser;
_outboundPackets = new TreeMap();
_outboundQueue = queue;
@ -105,6 +107,8 @@ public class Connection {
_congestionWindowEnd = 0;
_highestAckedThrough = -1;
_lastCongestionSeenAt = MAX_WINDOW_SIZE;
_lastCongestionTime = -1;
_lastCongestionHighestUnacked = -1;
_connectionManager = manager;
_resetReceived = false;
_connected = true;
@ -599,6 +603,8 @@ public class Connection {
// dont set the size to (winSize >> 4). only set the
if (_ackSinceCongestion) {
_lastCongestionSeenAt = _options.getWindowSize();
_lastCongestionTime = _context.clock().now();
_lastCongestionHighestUnacked = _lastSendId;
_ackSinceCongestion = false;
}
}
@ -813,14 +819,24 @@ public class Connection {
_packet.setReceiveStreamId(_receiveStreamId);
_packet.setSendStreamId(_sendStreamId);
// shrink the window
int newWindowSize = getOptions().getWindowSize();
congestionOccurred();
_context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
newWindowSize /= 2;
if (newWindowSize <= 0)
newWindowSize = 1;
getOptions().setWindowSize(newWindowSize);
if (_ackSinceCongestion) {
// only shrink the window once per window
if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
congestionOccurred();
_context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
newWindowSize /= 2;
if (newWindowSize <= 0)
newWindowSize = 1;
if (_log.shouldLog(Log.WARN))
_log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize
+ ") for " + Connection.this.toString());
getOptions().setWindowSize(newWindowSize);
}
}
int numSends = _packet.getNumSends() + 1;

View File

@ -146,7 +146,6 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
con.getInputStream().updateAcks(packet);
packet.setOptionalDelay(con.getOptions().getChoke());
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
packet.setResendDelay(con.getOptions().getResendDelay());
if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
@ -159,6 +158,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) {
packet.setFlag(Packet.FLAG_SYNCHRONIZE);
packet.setOptionalFrom(con.getSession().getMyDestination());
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
}
// don't set the closed flag if this is a plain ACK and there are outstanding

View File

@ -37,10 +37,11 @@ public class ConnectionManager {
private Map _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private Object _connectionLock;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent) {
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context;
_log = context.logManager().getLog(ConnectionManager.class);
_connectionByInboundId = new HashMap(32);
@ -56,6 +57,7 @@ public class ConnectionManager {
_outboundQueue = new PacketQueue(context, session, this);
_allowIncoming = false;
_maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions;
_numWaiting = 0;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
@ -103,7 +105,7 @@ public class ConnectionManager {
* it, or null if the syn's streamId was already taken
*/
public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler);
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId);
boolean reject = false;

View File

@ -9,7 +9,7 @@ import java.util.Properties;
public class ConnectionOptions extends I2PSocketOptionsImpl {
private int _connectDelay;
private boolean _fullySigned;
private int _windowSize;
private volatile int _windowSize;
private int _receiveWindow;
private int _profile;
private int _rtt;
@ -81,8 +81,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 5*1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2*1000));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
@ -93,6 +93,39 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
}
public void setProperties(Properties opts) {
super.setProperties(opts);
if (opts == null) return;
if (opts.containsKey(PROP_CONNECT_DELAY))
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
if (opts.containsKey(PROP_PROFILE))
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
if (opts.containsKey(PROP_MAX_MESSAGE_SIZE))
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
if (opts.containsKey(PROP_INITIAL_RTT))
setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000));
if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW))
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500));
if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
if (opts.containsKey(PROP_MAX_RESENDS))
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
if (opts.containsKey(PROP_WRITE_TIMEOUT))
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
if (opts.containsKey(PROP_INACTIVITY_ACTION))
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE);
if (opts.containsKey(PROP_CONNECT_TIMEOUT))
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
}
/**
* how long will we wait after instantiating a new con
* before actually attempting to connect. If this is

View File

@ -49,7 +49,16 @@ public class ConnectionPacketHandler {
}
return;
}
if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
if (packet.getOptionalMaxSize() < con.getOptions().getMaxMessageSize()) {
if (_log.shouldLog(Log.INFO))
_log.info("Reducing our max message size to " + packet.getOptionalMaxSize()
+ " from " + con.getOptions().getMaxMessageSize());
con.getOptions().setMaxMessageSize(packet.getOptionalMaxSize());
con.getOutputStream().setBufferSize(packet.getOptionalMaxSize());
}
}
con.packetReceived();
@ -185,20 +194,21 @@ public class ConnectionPacketHandler {
oldSize >>>= 1;
if (oldSize <= 0)
oldSize = 1;
con.getOptions().setWindowSize(oldSize);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Congestion occurred - new windowSize " + oldSize + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con);
con.getOptions().setWindowSize(oldSize);
congested = true;
}
long lowest = con.getHighestAckedThrough();
if (lowest >= con.getCongestionWindowEnd()) {
// new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize();
int oldWindow = con.getOptions().getWindowSize();
int newWindowSize = oldWindow;
if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
@ -216,7 +226,7 @@ public class ConnectionPacketHandler {
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("New window size " + newWindowSize + " congestionSeenAt: "
_log.debug("New window size " + newWindowSize + "/" + oldWindow + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con);
con.getOptions().setWindowSize(newWindowSize);

View File

@ -77,10 +77,10 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
_maxStreams = -1;
}
_connectionManager = new ConnectionManager(_context, _session, _maxStreams);
_name = name + " " + (++__managerId);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_defaultOptions = new ConnectionOptions(opts);
_connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
_serverSocket = new I2PServerSocketFull(this);
if (_log.shouldLog(Log.INFO)) {
@ -91,7 +91,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocketOptions buildOptions() { return buildOptions(null); }
public I2PSocketOptions buildOptions(Properties opts) {
return new ConnectionOptions(opts);
ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
curOpts.setProperties(opts);
return curOpts;
}
public I2PSession getSession() {
@ -164,9 +166,13 @@ public class I2PSocketManagerFull implements I2PSocketManager {
options = _defaultOptions;
ConnectionOptions opts = null;
if (options instanceof ConnectionOptions)
opts = (ConnectionOptions)options;
opts = new ConnectionOptions((ConnectionOptions)options);
else
opts = new ConnectionOptions(options);
if (_log.shouldLog(Log.INFO))
_log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
+ " with options: " + opts);
Connection con = _connectionManager.connect(peer, opts);
if (con == null)
throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");

View File

@ -32,6 +32,12 @@ public class MessageOutputStream extends OutputStream {
private long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */
private int _passiveFlushDelay;
/**
* if we are changing the buffer size during operation, set this to the new
* buffer size, and next time we are flushing, update the _buf array to the new
* size
*/
private volatile int _nextBufferSize;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@ -48,6 +54,7 @@ public class MessageOutputStream extends OutputStream {
_closed = false;
_writeTimeout = -1;
_passiveFlushDelay = 500;
_nextBufferSize = -1;
_flusher = new Flusher();
if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created");
@ -55,6 +62,7 @@ public class MessageOutputStream extends OutputStream {
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
public int getWriteTimeout() { return _writeTimeout; }
public void setBufferSize(int size) { _nextBufferSize = size; }
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
@ -103,6 +111,8 @@ public class MessageOutputStream extends OutputStream {
_valid = 0;
throwAnyError();
_lastFlushed = _context.clock().now();
locked_updateBufferSize();
}
}
if (ws != null) {
@ -134,6 +144,22 @@ public class MessageOutputStream extends OutputStream {
throwAnyError();
}
/**
* If the other side requested we shrink our buffer, do so.
*
*/
private final void locked_updateBufferSize() {
int size = _nextBufferSize;
if (size > 0) {
// update the buffer size to the requested amount
_dataCache.release(new ByteArray(_buf));
_dataCache = ByteCache.getInstance(128, size);
ByteArray ba = _dataCache.acquire();
_buf = ba.getData();
_nextBufferSize = -1;
}
}
/**
* Flush data that has been enqued but not flushed after a certain
* period of inactivity
@ -180,6 +206,7 @@ public class MessageOutputStream extends OutputStream {
_written += _valid;
_valid = 0;
_lastFlushed = _context.clock().now();
locked_updateBufferSize();
_dataLock.notifyAll();
sent = true;
}
@ -213,6 +240,7 @@ public class MessageOutputStream extends OutputStream {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
locked_updateBufferSize();
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
}
@ -251,6 +279,7 @@ public class MessageOutputStream extends OutputStream {
ba = new ByteArray(_buf);
_buf = null;
_valid = 0;
locked_updateBufferSize();
}
}
if (ba != null) {
@ -314,6 +343,7 @@ public class MessageOutputStream extends OutputStream {
ws = target.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
locked_updateBufferSize();
_dataLock.notifyAll();
_lastFlushed = _context.clock().now();
}

View File

@ -563,7 +563,7 @@ public class Packet {
if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY ").append(_optionDelay);
if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO");
if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM");
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS");
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS ").append(_optionMaxSize);
if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE");
if (isFlagSet(FLAG_RESET)) buf.append(" RESET");
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG");

View File

@ -35,7 +35,7 @@ public class PacketHandler {
// artificial choke: 2% random drop and a 0-30s
// random tiered delay from 0-30s
if (_context.random().nextInt(100) >= 95) {
displayPacket(packet, "DROP");
displayPacket(packet, "DROP", null);
return false;
} else {
// if (true) return true; // no lag, just drop
@ -97,18 +97,18 @@ public class PacketHandler {
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
receiveKnownCon(con, packet);
displayPacket(packet, "RECV");
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize());
} else {
receiveUnknownCon(packet, sendId);
displayPacket(packet, "UNKN");
displayPacket(packet, "UNKN", null);
}
}
private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
void displayPacket(Packet packet, String prefix) {
void displayPacket(Packet packet, String prefix, String suffix) {
String msg = null;
synchronized (_fmt) {
msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString();
msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : "");
}
if (_log.shouldLog(Log.DEBUG))
System.out.println(msg);

View File

@ -121,7 +121,9 @@ class PacketQueue {
+ " con: " + conStr;
_log.debug(msg);
}
_connectionManager.getPacketHandler().displayPacket(packet, "SEND");
Connection c = packet.getConnection();
String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() : null);
_connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix);
}
}

View File

@ -19,7 +19,7 @@ public class PingTest {
try {
I2PAppContext context = I2PAppContext.getGlobalContext();
I2PSession session = createSession();
ConnectionManager mgr = new ConnectionManager(context, session, -1);
ConnectionManager mgr = new ConnectionManager(context, session, -1, null);
Log log = context.logManager().getLog(PingTest.class);
for (int i = 0; i < 10; i++) {
log.debug("ping " + i);