diff --git a/router/java/src/net/i2p/router/transport/BandwidthLimiter.java b/router/java/src/net/i2p/router/transport/BandwidthLimiter.java index fa068c4f4..6499c0943 100644 --- a/router/java/src/net/i2p/router/transport/BandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/BandwidthLimiter.java @@ -21,6 +21,9 @@ public class BandwidthLimiter { private Log _log; protected RouterContext _context; + protected Object _outboundWaitLock = new Object(); + protected Object _inboundWaitLock = new Object(); + protected BandwidthLimiter(RouterContext context) { _context = context; _log = context.logManager().getLog(BandwidthLimiter.class); @@ -57,11 +60,14 @@ public class BandwidthLimiter { * from the peer will not violate the bandwidth limits */ public void delayInbound(RouterIdentity peer, int numBytes) { - long ms = calculateDelayInbound(peer, numBytes); - if (ms > 0) { - _log.debug("Delaying inbound " + ms +"ms for " + numBytes +" bytes"); - try { Thread.sleep(ms); } catch (InterruptedException ie) {} + while (calculateDelayInbound(peer, numBytes) > 0) { + try { + synchronized (_inboundWaitLock) { + _inboundWaitLock.wait(10*1000); + } + } catch (InterruptedException ie) {} } + synchronized (_inboundWaitLock) { _inboundWaitLock.notify(); } consumeInbound(peer, numBytes); } /** @@ -69,12 +75,14 @@ public class BandwidthLimiter { * to the peer will not violate the bandwidth limits */ public void delayOutbound(RouterIdentity peer, int numBytes) { - long ms = calculateDelayOutbound(peer, numBytes); - if (ms > 0) { - _log.debug("Delaying outbound " + ms + "ms for " + numBytes + " bytes"); - try { Thread.sleep(ms); } catch (InterruptedException ie) {} + while (calculateDelayOutbound(peer, numBytes) > 0) { + try { + synchronized (_outboundWaitLock) { + _outboundWaitLock.wait(10*1000); + } + } catch (InterruptedException ie) {} } - + synchronized (_outboundWaitLock) { _outboundWaitLock.notify(); } consumeOutbound(peer, numBytes); } } diff --git a/router/java/src/net/i2p/router/transport/TrivialBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/TrivialBandwidthLimiter.java index df016e89d..7b3774d69 100644 --- a/router/java/src/net/i2p/router/transport/TrivialBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/TrivialBandwidthLimiter.java @@ -9,9 +9,9 @@ package net.i2p.router.transport; */ import net.i2p.data.RouterIdentity; -import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; +import net.i2p.util.I2PThread; /** * Coordinate the bandwidth limiting across all classes of peers. Currently @@ -20,69 +20,98 @@ import net.i2p.util.Log; */ public class TrivialBandwidthLimiter extends BandwidthLimiter { private Log _log; - private volatile long _maxReceiveBytesPerMinute; - private volatile long _maxSendBytesPerMinute; + /** how many bytes can we read from the network without blocking? */ + private volatile long _inboundAvailable; + /** how many bytes can we write to the network without blocking? */ + private volatile long _outboundAvailable; + /** how large will we let the inboundAvailable queue grow? */ + private volatile long _inboundBurstBytes; + /** how large will we let the outboundAvailable queue grow? */ + private volatile long _outboundBurstBytes; + /** how many bytes have we ever read from the network? */ + private volatile long _totalInboundBytes; + /** how many bytes have we ever written to the network? */ + private volatile long _totalOutboundBytes; + /** how many KBps do we want to allow? */ + private long _inboundKBytesPerSecond; + /** how many KBps do we want to allow? */ + private long _outboundKBytesPerSecond; + /** how frequently do we want to replenish the available queues? */ + private long _replenishFrequency; + private long _minNonZeroDelay; + /** + * when did we last replenish the available queues (since it wont + * likely exactly match the replenish frequency)? + */ private volatile long _lastResync; - private volatile long _lastReadConfig; - private volatile long _totalReceiveBytes; - private volatile long _totalSendBytes; - private volatile long _availableSend; - private volatile long _availableReceive; + /** when did we last update the limits? */ + private long _lastUpdateLimits; - private final static String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundBytesPerMinute"; - private final static String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundBytesPerMinute"; + /** + * notify this object whenever we need bandwidth and we'll refresh the pool + * (though not necessarily with sufficient or even any bytes) + * + */ + private Object _updateBwLock = new Object(); - private final static long MINUTE = 60*1000; - private final static long READ_CONFIG_DELAY = MINUTE; + final static String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond"; + final static String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond"; + final static String PROP_INBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.inboundBurstKBytes"; + final static String PROP_OUTBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.outboundBurstKBytes"; + final static String PROP_REPLENISH_FREQUENCY = "i2np.bandwidth.replenishFrequency"; + final static String PROP_MIN_NON_ZERO_DELAY = "i2np.bandwidth.minimumNonZeroDelay"; + final static long DEFAULT_REPLENISH_FREQUENCY = 1*1000; + final static long DEFAULT_MIN_NON_ZERO_DELAY = 1*1000; - // max # bytes to store in the pool, in case we have lots of traffic we don't want to - // spike too hard - private static long MAX_IN_POOL = 10*1024; - private static long MAX_OUT_POOL = 10*1024; + final static long UPDATE_LIMIT_FREQUENCY = 60*1000; public TrivialBandwidthLimiter(RouterContext ctx) { - this(ctx, -1, -1); - } - TrivialBandwidthLimiter(RouterContext ctx, long sendPerMinute, long receivePerMinute) { super(ctx); _log = ctx.logManager().getLog(TrivialBandwidthLimiter.class); - _maxReceiveBytesPerMinute = receivePerMinute; - _maxSendBytesPerMinute = sendPerMinute; + _inboundAvailable = 0; + _outboundAvailable = 0; + _inboundBurstBytes = -1; + _outboundBurstBytes = -1; + _inboundKBytesPerSecond = -1; + _outboundKBytesPerSecond = -1; + _totalInboundBytes = 0; + _totalOutboundBytes = 0; + _replenishFrequency = DEFAULT_REPLENISH_FREQUENCY; _lastResync = ctx.clock().now(); - _lastReadConfig = _lastResync; - _totalReceiveBytes = 0; - _totalSendBytes = 0; - _availableReceive = receivePerMinute; - _availableSend = sendPerMinute; - MAX_IN_POOL = 10*_availableReceive; - MAX_OUT_POOL = 10*_availableSend; - _context.jobQueue().addJob(new UpdateBWJob()); updateLimits(); - if (_log.shouldLog(Log.INFO)) - _log.info("Initializing the limiter with maximum inbound [" + MAX_IN_POOL - + "] outbound [" + MAX_OUT_POOL + "]"); + I2PThread bwThread = new I2PThread(new UpdateBWRunner()); + bwThread.setDaemon(true); + bwThread.setPriority(I2PThread.MIN_PRIORITY); + bwThread.setName("BW Updater"); + bwThread.start(); } - public long getTotalSendBytes() { return _totalSendBytes; } - public long getTotalReceiveBytes() { return _totalReceiveBytes; } + public long getTotalSendBytes() { return _totalOutboundBytes; } + public long getTotalReceiveBytes() { return _totalInboundBytes; } /** * Return how many milliseconds to wait before receiving/processing numBytes from the peer */ public long calculateDelayInbound(RouterIdentity peer, int numBytes) { - if (_maxReceiveBytesPerMinute <= 0) return 0; - if (_availableReceive - numBytes > 0) { + if (_inboundKBytesPerSecond <= 0) return 0; + if (_inboundAvailable - numBytes > 0) { // we have bytes available return 0; } else { // we don't have sufficient bytes. - // the delay = (needed/numPerMinute) - long val = MINUTE*(numBytes-_availableReceive)/_maxReceiveBytesPerMinute; + // the delay = 1000*(bytes needed/bytes per second) + double val = 1000.0*(((double)numBytes-(double)_inboundAvailable)/((double)_inboundKBytesPerSecond*1024)); + long rv = (long)Math.ceil(val); + if ( (rv > 0) && (rv < _minNonZeroDelay) ) + rv = _minNonZeroDelay; if (_log.shouldLog(Log.DEBUG)) - _log.debug("DelayInbound: " + val + " for " + numBytes + " (avail=" - + _availableReceive + ", max=" + _maxReceiveBytesPerMinute + ")"); - return val; + _log.debug("DelayInbound: " + rv + " for " + numBytes + " (avail=" + + _inboundAvailable + ", max=" + _inboundBurstBytes + ", kbps=" + _inboundKBytesPerSecond + ")"); + // we will want to replenish before this requestor comes back for the data + if (rv < _replenishFrequency) + synchronized (_updateBwLock) { _updateBwLock.notify(); } + return rv; } } @@ -90,18 +119,26 @@ public class TrivialBandwidthLimiter extends BandwidthLimiter { * Return how many milliseconds to wait before sending numBytes to the peer */ public long calculateDelayOutbound(RouterIdentity peer, int numBytes) { - if (_maxSendBytesPerMinute <= 0) return 0; - if (_availableSend - numBytes > 0) { + if (_outboundKBytesPerSecond <= 0) return 0; + if (_outboundAvailable - numBytes > 0) { // we have bytes available return 0; } else { // we don't have sufficient bytes. - // the delay = (needed/numPerMinute) - long val = MINUTE*(numBytes-_availableSend)/_maxSendBytesPerMinute; + // lets make sure... + // the delay = 1000*(bytes needed/bytes per second) + long avail = _outboundAvailable; + double val = 1000.0*(((double)numBytes-(double)avail)/((double)_outboundKBytesPerSecond*1024.0)); + long rv = (long)Math.ceil(val); + if ( (rv > 0) && (rv < _minNonZeroDelay) ) + rv = _minNonZeroDelay; if (_log.shouldLog(Log.DEBUG)) - _log.debug("DelayOutbound: " + val + " for " + numBytes + " (avail=" - + _availableSend + ", max=" + _maxSendBytesPerMinute + ")"); - return val; + _log.debug("DelayOutbound: " + rv + " for " + numBytes + " (avail=" + + avail + ", max=" + _outboundBurstBytes + ", kbps=" + _outboundKBytesPerSecond + ")"); + // we will want to replenish before this requestor comes back for the data + if (rv < _replenishFrequency) + synchronized (_updateBwLock) { _updateBwLock.notify(); } + return rv; } } @@ -109,104 +146,225 @@ public class TrivialBandwidthLimiter extends BandwidthLimiter { * Note that numBytes have been read from the peer */ public void consumeInbound(RouterIdentity peer, int numBytes) { - _totalReceiveBytes += numBytes; - _availableReceive -= numBytes; + _totalInboundBytes += numBytes; + if (_inboundKBytesPerSecond > 0) + _inboundAvailable -= numBytes; } /** * Note that numBytes have been sent to the peer */ public void consumeOutbound(RouterIdentity peer, int numBytes) { - _totalSendBytes += numBytes; - _availableSend -= numBytes; + _totalOutboundBytes += numBytes; + if (_outboundKBytesPerSecond > 0) + _outboundAvailable -= numBytes; } private void updateLimits() { - String inBwStr = _context.router().getConfigSetting(PROP_INBOUND_BANDWIDTH); - String outBwStr = _context.router().getConfigSetting(PROP_OUTBOUND_BANDWIDTH); - if (true) { - // DISABLED UNTIL THIS STUFF GETS A REVAMP - inBwStr = "-60"; - outBwStr = "-60"; - } - long oldReceive = _maxReceiveBytesPerMinute; - long oldSend = _maxSendBytesPerMinute; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Read limits ["+inBwStr+" in, " + outBwStr + " out] vs current [" + oldReceive + " in, " + oldSend + " out]"); - - if ( (inBwStr != null) && (inBwStr.trim().length() > 0) ) { + _log.debug("Updating rates for the bw limiter"); + + _lastUpdateLimits = _context.clock().now(); + updateInboundRate(); + updateOutboundRate(); + updateInboundPeak(); + updateOutboundPeak(); + updateReplenishFrequency(); + updateMinNonZeroDelay(); + } + + private void updateInboundRate() { + String inBwStr = _context.getProperty(PROP_INBOUND_BANDWIDTH); + if ( (inBwStr != null) && + (inBwStr.trim().length() > 0) && + (!(inBwStr.equals(String.valueOf(_inboundKBytesPerSecond)))) ) { + // bandwidth was specified *and* changed try { long in = Long.parseLong(inBwStr); if (in >= 0) { - _maxReceiveBytesPerMinute = in; - MAX_IN_POOL = 10*_maxReceiveBytesPerMinute; + _inboundKBytesPerSecond = in; } } catch (NumberFormatException nfe) { - _log.warn("Invalid inbound bandwidth limit [" + inBwStr + "], keeping as " + _maxReceiveBytesPerMinute); + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid inbound bandwidth limit [" + inBwStr + + "], keeping as " + _inboundKBytesPerSecond); } } else { - _log.warn("Inbound bandwidth limits not specified in the config via " + PROP_INBOUND_BANDWIDTH); + if ( (inBwStr == null) && (_log.shouldLog(Log.DEBUG)) ) + _log.debug("Inbound bandwidth limits not specified in the config via " + PROP_INBOUND_BANDWIDTH); } - if ( (outBwStr != null) && (outBwStr.trim().length() > 0) ) { + } + private void updateOutboundRate() { + String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH); + + if ( (outBwStr != null) && + (outBwStr.trim().length() > 0) && + (!(outBwStr.equals(String.valueOf(_outboundKBytesPerSecond)))) ) { + // bandwidth was specified *and* changed try { long out = Long.parseLong(outBwStr); - if (out >= 0) { - _maxSendBytesPerMinute = out; - MAX_OUT_POOL = 10*_maxSendBytesPerMinute; - } + _outboundKBytesPerSecond = out; } catch (NumberFormatException nfe) { - _log.warn("Invalid outbound bandwidth limit [" + outBwStr + "], keeping as " + _maxSendBytesPerMinute); + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid outbound bandwidth limit [" + outBwStr + + "], keeping as " + _outboundKBytesPerSecond); } } else { - _log.warn("Outbound bandwidth limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH); - } - - if ( (oldReceive != _maxReceiveBytesPerMinute) || (oldSend != _maxSendBytesPerMinute) ) { - _log.info("Max receive bytes per minute: " + _maxReceiveBytesPerMinute + ", max send per minute: " + _maxSendBytesPerMinute); - _availableReceive = _maxReceiveBytesPerMinute; - _availableSend = _maxSendBytesPerMinute; + if ( (outBwStr == null) && (_log.shouldLog(Log.DEBUG)) ) + _log.debug("Outbound bandwidth limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH); } } - private class UpdateBWJob extends JobImpl { - public UpdateBWJob() { - super(TrivialBandwidthLimiter.this._context); - getTiming().setStartAfter(TrivialBandwidthLimiter.this._context.clock().now() + MINUTE); + private void updateInboundPeak() { + String inBwStr = _context.getProperty(PROP_INBOUND_BANDWIDTH_PEAK); + if ( (inBwStr != null) && + (inBwStr.trim().length() > 0) && + (!(inBwStr.equals(String.valueOf(_inboundBurstBytes)))) ) { + // peak bw was specified *and* changed + try { + long in = Long.parseLong(inBwStr); + _inboundBurstBytes = in * 1024; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid inbound bandwidth burst limit [" + inBwStr + + "], keeping as " + _inboundBurstBytes); + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Inbound bandwidth burst limits not specified in the config via " + PROP_INBOUND_BANDWIDTH_PEAK); } - public String getName() { return "Update bandwidth available"; } - - public void runJob() { - long now = TrivialBandwidthLimiter.this._context.clock().now(); - long numMinutes = ((now - _lastResync)/MINUTE) + 1; - _availableReceive += numMinutes * _maxReceiveBytesPerMinute; - _availableSend += numMinutes * _maxSendBytesPerMinute; - _lastResync = now; - + } + private void updateOutboundPeak() { + String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH_PEAK); + if ( (outBwStr != null) && + (outBwStr.trim().length() > 0) && + (!(outBwStr.equals(String.valueOf(_outboundBurstBytes)))) ) { + // peak bw was specified *and* changed + try { + long out = Long.parseLong(outBwStr); + if (out >= 0) { + _outboundBurstBytes = out * 1024; + } + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid outbound bandwidth burst limit [" + outBwStr + + "], keeping as " + _outboundBurstBytes); + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Outbound bandwidth burst limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH_PEAK); + } + } + + private void updateReplenishFrequency() { + String freqMs = _context.getProperty(PROP_REPLENISH_FREQUENCY); + if ( (freqMs != null) && + (freqMs.trim().length() > 0) && + (!(freqMs.equals(String.valueOf(_replenishFrequency)))) ) { + // frequency was specified *and* changed + try { + long ms = Long.parseLong(freqMs); + if (ms >= 0) { + _replenishFrequency = ms; + } + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid replenish frequency [" + freqMs + + "], keeping as " + _replenishFrequency); + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Replenish frequency not specified in the config via " + PROP_REPLENISH_FREQUENCY); + _replenishFrequency = DEFAULT_REPLENISH_FREQUENCY; + } + } + + private void updateMinNonZeroDelay() { + String delayMs = _context.getProperty(PROP_MIN_NON_ZERO_DELAY); + if ( (delayMs != null) && + (delayMs.trim().length() > 0) && + (!(delayMs.equals(String.valueOf(_minNonZeroDelay)))) ) { + // delay was specified *and* changed + try { + long ms = Long.parseLong(delayMs); + if (ms >= 0) { + _minNonZeroDelay = ms; + } + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid minimum nonzero delay [" + delayMs + + "], keeping as " + _minNonZeroDelay); + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Minimum nonzero delay not specified in the config via " + PROP_MIN_NON_ZERO_DELAY); + _minNonZeroDelay = DEFAULT_MIN_NON_ZERO_DELAY; + } + } + + public void reinitialize() { + _inboundAvailable = 0; + _inboundBurstBytes = 0; + _inboundKBytesPerSecond = -1; + _lastResync = _context.clock().now(); + _lastUpdateLimits = -1; + _minNonZeroDelay = DEFAULT_MIN_NON_ZERO_DELAY; + _outboundAvailable = 0; + _outboundBurstBytes = 0; + _outboundKBytesPerSecond = -1; + _replenishFrequency = DEFAULT_REPLENISH_FREQUENCY; + _totalInboundBytes = 0; + _totalOutboundBytes = 0; + updateLimits(); + updateBW(); + } + + private void updateBW() { + long now = _context.clock().now(); + long numMs = (now - _lastResync); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Updating bandwidth after " + numMs + " (available in=" + _inboundAvailable + ", out=" + _outboundAvailable + ", rate in=" + _inboundKBytesPerSecond + ", out=" + _outboundKBytesPerSecond +")"); + if (numMs > 1000) { + long inboundToAdd = 1024*_inboundKBytesPerSecond * (numMs/1000); + long outboundToAdd = 1024*_outboundKBytesPerSecond * (numMs/1000); + + if (inboundToAdd < 0) inboundToAdd = 0; + if (outboundToAdd < 0) outboundToAdd = 0; + + _inboundAvailable += inboundToAdd; + _outboundAvailable += outboundToAdd; + + if (_inboundAvailable > _inboundBurstBytes) + _inboundAvailable = _inboundBurstBytes; + if (_outboundAvailable > _outboundBurstBytes) + _outboundAvailable = _outboundBurstBytes; + if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Adding " + (numMinutes*_maxReceiveBytesPerMinute) + " bytes to availableReceive"); - _log.debug("Adding " + (numMinutes*_maxSendBytesPerMinute) + " bytes to availableSend"); + _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable (current: " + _inboundAvailable + ")"); + _log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable (current: " + _outboundAvailable + ")"); } - - // if we're huge, trim - if (_availableReceive > MAX_IN_POOL) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Trimming available receive to " + MAX_IN_POOL); - _availableReceive = MAX_IN_POOL; - } - if (_availableSend > MAX_OUT_POOL) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Trimming available send to " + MAX_OUT_POOL); - _availableSend = MAX_OUT_POOL; - } - - getTiming().setStartAfter(now + MINUTE); - UpdateBWJob.this._context.jobQueue().addJob(UpdateBWJob.this); - - // now update the bandwidth limits, in case they've changed - if (now > _lastReadConfig + READ_CONFIG_DELAY) { - updateLimits(); - _lastReadConfig = now; + + if (inboundToAdd > 0) synchronized (_inboundWaitLock) { _inboundWaitLock.notify(); } + if (outboundToAdd > 0) synchronized (_outboundWaitLock) { _outboundWaitLock.notify(); } + } + _lastResync = now; + } + + private class UpdateBWRunner implements Runnable { + public void run() { + while (true) { + try { + synchronized (_updateBwLock) { + _updateBwLock.wait(_replenishFrequency); + } + } catch (InterruptedException ie) {} + try { + updateBW(); + if (_context.clock().now() > _lastUpdateLimits + UPDATE_LIMIT_FREQUENCY) + updateLimits(); + } catch (Exception e) { + _log.log(Log.CRIT, "Error updating bandwidth!", e); + } } } }