- Add experimental bandwidth limiter
      - Add I2PSession API method to update tunnel and bandwidth
        configuration on an existing session
      - Filter more system properties before passing them to the router
This commit is contained in:
zzz
2011-01-03 15:56:02 +00:00
parent 1a3b0d3812
commit 532c9d3fc5
4 changed files with 187 additions and 26 deletions

View File

@ -12,6 +12,8 @@ package net.i2p.client;
import java.util.Date;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
@ -41,22 +43,51 @@ import net.i2p.util.Log;
* @author jrandom
*/
class I2CPMessageProducer {
private final static Log _log = new Log(I2CPMessageProducer.class);
private final Log _log;
private final I2PAppContext _context;
private int _sendBps;
private long _sendPeriodBytes;
private long _sendPeriodBeginTime;
private int _maxBytesPerSecond;
private volatile int _sendPeriodBytes;
private volatile long _sendPeriodBeginTime;
private final ReentrantLock _lock;
private static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond";
/** see ConnectionOptions in streaming - MTU + streaming overhead + gzip overhead */
private static final int TYP_SIZE = 1730 + 28 + 23;
private static final int MIN_RATE = 2 * TYP_SIZE;
public I2CPMessageProducer(I2PAppContext context) {
_context = context;
context.statManager().createRateStat("client.sendBpsRaw", "How fast we pump out I2CP data messages", "ClientMessages", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
_log = context.logManager().getLog(I2CPMessageProducer.class);
_lock = new ReentrantLock(true);
context.statManager().createRateStat("client.sendThrottled", "Times waited for bandwidth", "ClientMessages", new long[] { 60*1000 });
context.statManager().createRateStat("client.sendDropped", "Length of msg dropped waiting for bandwidth", "ClientMessages", new long[] { 60*1000 });
}
/**
* Update the bandwidth setting
* @since 0.8.4
*/
public void updateBandwidth(I2PSessionImpl session) {
String max = session.getOptions().getProperty(PROP_MAX_BW);
if (max != null) {
try {
int iMax = Integer.parseInt(max);
if (iMax > 0)
// round up to next higher TYP_SIZE for efficiency, then add some fudge for small messages
_maxBytesPerSecond = 256 + Math.max(MIN_RATE, TYP_SIZE * ((iMax + TYP_SIZE - 1) / TYP_SIZE));
else
_maxBytesPerSecond = 0;
} catch (NumberFormatException nfe) {}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting " + _maxBytesPerSecond + " BPS max");
}
/**
* Send all the messages that a client needs to send to a router to establish
* a new session.
*/
public void connect(I2PSessionImpl session) throws I2PSessionException {
updateBandwidth(session);
CreateSessionMessage msg = new CreateSessionMessage();
SessionConfig cfg = new SessionConfig(session.getMyDestination());
cfg.setOptions(session.getOptions());
@ -99,6 +130,9 @@ class I2CPMessageProducer {
*/
public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag,
SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException {
if (!updateBps(payload.length, expires))
// drop the message... send fail notification?
return;
SendMessageMessage msg;
if (expires > 0) {
msg = new SendMessageExpiresMessage();
@ -111,20 +145,108 @@ class I2CPMessageProducer {
Payload data = createPayload(dest, payload, tag, key, tags, newKey);
msg.setPayload(data);
session.sendMessage(msg);
updateBps(payload.length);
}
private void updateBps(int len) {
long now = _context.clock().now();
float period = ((float)now-_sendPeriodBeginTime)/1000f;
if (period >= 1f) {
// first term decays on slow transmission
_sendBps = (int)(((float)0.9f * (float)_sendBps) + ((float)0.1f*((float)_sendPeriodBytes)/period));
_sendPeriodBytes = len;
_sendPeriodBeginTime = now;
_context.statManager().addRateData("client.sendBpsRaw", _sendBps, 0);
} else {
_sendPeriodBytes += len;
/**
* Super-simple bandwidth throttler.
* We only calculate on a one-second basis, so large messages
* (compared to the one-second limit) may exceed the limits.
* Tuned for streaming, may not work well for large datagrams.
*
* This does poorly with low rate limits since it doesn't credit
* bandwidth across two periods. So the limit is rounded up,
* and the min limit is set to 2x the typ size, above.
*
* Blocking so this could be very bad for retransmissions,
* as it could clog StreamingTimer.
* Waits are somewhat "fair" using ReentrantLock.
* While out-of-order transmission is acceptable, fairness
* reduces the chance of starvation. ReentrantLock does not
* guarantee in-order execution due to thread priority issues,
* so out-of-order may still occur. But shouldn't happen within
* the same thread anyway... Also note that small messages may
* go ahead of large ones that are waiting for the next window.
* Also, threads waiting a second time go to the back of the line.
*
* Since this is at the I2CP layer, it includes streaming overhead,
* streaming acks and retransmissions,
* gzip overhead (or "underhead" for compression),
* repliable datagram overhead, etc.
* However, it does not, of course, include the substantial overhead
* imposed by the router for the leaseset, tags, encryption,
* and fixed-size tunnel messages.
*
* @param expires if > 0, an expiration date
* @return true if we should send the message, false to drop it
*/
private boolean updateBps(int len, long expires) {
if (_maxBytesPerSecond <= 0)
return true;
//synchronized(this) {
_lock.lock();
try {
int waitCount = 0;
while (true) {
long now = _context.clock().now();
if (waitCount > 0 && expires > 0 && expires < now) {
// just say no to bufferbloat... drop the message right here
_context.statManager().addRateData("client.sendDropped", len, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping " + len + " byte msg expired in queue");
return false;
}
long period = now - _sendPeriodBeginTime;
if (period >= 2000) {
// start new period, always let it through no matter how big
_sendPeriodBytes = len;
_sendPeriodBeginTime = now;
if (_log.shouldLog(Log.DEBUG))
_log.debug("New period after idle, " + len + " bytes");
return true;
}
if (period >= 1000) {
// start new period
// Allow burst within 2 sec, only advance window by 1 sec, and
// every other second give credit for unused bytes in previous period
if (_sendPeriodBytes > 0 && ((_sendPeriodBeginTime / 1000) & 0x01) == 0)
_sendPeriodBytes += len - _maxBytesPerSecond;
else
_sendPeriodBytes = len;
_sendPeriodBeginTime += 1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug("New period, " + len + " bytes");
return true;
}
if (_sendPeriodBytes + len <= _maxBytesPerSecond) {
// still bytes available in this period
_sendPeriodBytes += len;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + len + ", Elapsed " + period + "ms, total " + _sendPeriodBytes + " bytes");
return true;
}
if (waitCount >= 2) {
// just say no to bufferbloat... drop the message right here
_context.statManager().addRateData("client.sendDropped", len, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping " + len + " byte msg after waiting " + waitCount + " times");
return false;
}
// wait until next period
_context.statManager().addRateData("client.sendThrottled", ++waitCount, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Throttled " + len + " bytes, wait #" + waitCount + ' ' + (1000 - period) + "ms" /*, new Exception()*/);
try {
//this.wait(1000 - period);
_lock.newCondition().await(1000 - period, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {}
}
} finally {
_lock.unlock();
}
}

View File

@ -9,6 +9,7 @@ package net.i2p.client;
*
*/
import java.util.Properties;
import java.util.Set;
import net.i2p.data.Destination;
@ -151,6 +152,13 @@ public interface I2PSession {
*/
public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException;
/**
* Does not remove properties previously present but missing from this options parameter.
* @param options non-null
* @since 0.8.4
*/
public void updateOptions(Properties options);
/**
* Get the current bandwidth limits. Blocking.
*/

View File

@ -221,20 +221,32 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
}
/** save some memory, don't pass along the pointless properties */
private Properties filter(Properties options) {
Properties rv = new Properties();
for (Iterator iter = options.keySet().iterator(); iter.hasNext();) {
String key = (String) iter.next();
String val = options.getProperty(key);
if (key.startsWith("java") ||
key.startsWith("user") ||
key.startsWith("os") ||
key.startsWith("sun") ||
key.startsWith("file") ||
key.startsWith("line") ||
key.startsWith("wrapper")) {
if (key.startsWith("java.") ||
key.startsWith("user.") ||
key.startsWith("os.") ||
key.startsWith("sun.") ||
key.startsWith("file.") ||
key.equals("line.separator") ||
key.equals("path.separator") ||
key.equals("prng.buffers") ||
key.equals("router.trustedUpdateKeys") ||
key.startsWith("router.update") ||
key.startsWith("routerconsole.") ||
key.startsWith("time.") ||
key.startsWith("stat.") ||
key.startsWith("gnu.") || // gnu JVM
key.startsWith("net.i2p.router.web.") || // console nonces
key.startsWith("wrapper.")) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Skipping property: " + key);
} else if ((key.length() > 255) || (val.length() > 255)) {
continue;
}
String val = options.getProperty(key);
if ((key.length() > 255) || (val.length() > 255)) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "Not passing on property ["
+ key
@ -247,6 +259,18 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
return rv;
}
/**
* Update the tunnel and bandwidth settings
* @since 0.8.4
*/
public void updateOptions(Properties options) {
_options.putAll(filter(options));
_producer.updateBandwidth(this);
try {
_producer.updateTunnels(this, 0);
} catch (I2PSessionException ise) {}
}
void setLeaseSet(LeaseSet ls) {
_leaseSet = ls;
if (ls != null) {

View File

@ -98,6 +98,13 @@ class I2PSimpleSession extends I2PSessionImpl2 {
}
}
/**
* Ignore, does nothing
* @since 0.8.4
*/
@Override
public void updateOptions(Properties options) {}
/**
* Only map message handlers that we will use
*/