2005-03-21 jrandom
* Fixed the tunnel fragmentation handler to deal with multiple fragments in a single message properly (rather than release the buffer into the cache after processing the first one) (duh!) * Added the batching preprocessor which will bundle together multiple small messages inside a single tunnel message by delaying their delivery up to .5s, or whenever the pending data will fill a full message, whichever comes first. This is disabled at the moment, since without the above bugfix widely deployed, lots and lots of messages would fail. * Within each tunnel pool, stick with a randomly selected peer for up to .5s before randomizing and selecting again, instead of randomizing the pool each time a tunnel is needed.
This commit is contained in:
@ -33,6 +33,8 @@ public class TunnelPool {
|
||||
private int _buildsThisMinute;
|
||||
private long _currentMinute;
|
||||
private RefreshJob _refreshJob;
|
||||
private TunnelInfo _lastSelected;
|
||||
private long _lastSelectionPeriod;
|
||||
|
||||
/**
|
||||
* Only 3 builds per minute per pool, even if we have failing tunnels,
|
||||
@ -50,6 +52,8 @@ public class TunnelPool {
|
||||
_peerSelector = sel;
|
||||
_builder = builder;
|
||||
_alive = false;
|
||||
_lastSelectionPeriod = 0;
|
||||
_lastSelected = null;
|
||||
_lifetimeProcessed = 0;
|
||||
_buildsThisMinute = 0;
|
||||
_currentMinute = ctx.clock().now();
|
||||
@ -77,6 +81,8 @@ public class TunnelPool {
|
||||
}
|
||||
public void shutdown() {
|
||||
_alive = false;
|
||||
_lastSelectionPeriod = 0;
|
||||
_lastSelected = null;
|
||||
}
|
||||
|
||||
private int countUsableTunnels() {
|
||||
@ -153,6 +159,12 @@ public class TunnelPool {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* when selecting tunnels, stick with the same one for a brief
|
||||
* period to allow batching if we can.
|
||||
*/
|
||||
private static final long SELECTION_PERIOD = 500;
|
||||
|
||||
/**
|
||||
* Pull a random tunnel out of the pool. If there are none available but
|
||||
* the pool is configured to allow 0hop tunnels, this builds a fake one
|
||||
@ -161,7 +173,18 @@ public class TunnelPool {
|
||||
*/
|
||||
public TunnelInfo selectTunnel() { return selectTunnel(true); }
|
||||
private TunnelInfo selectTunnel(boolean allowRecurseOnFail) {
|
||||
long period = _context.clock().now();
|
||||
period -= period % SELECTION_PERIOD;
|
||||
synchronized (_tunnels) {
|
||||
if (_lastSelectionPeriod == period) {
|
||||
if ( (_lastSelected != null) &&
|
||||
(_lastSelected.getExpiration() > period) &&
|
||||
(_tunnels.contains(_lastSelected)) )
|
||||
return _lastSelected;
|
||||
}
|
||||
_lastSelectionPeriod = period;
|
||||
_lastSelected = null;
|
||||
|
||||
if (_tunnels.size() <= 0) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(toString() + ": No tunnels to select from");
|
||||
@ -172,6 +195,7 @@ public class TunnelPool {
|
||||
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
|
||||
if (info.getExpiration() > _context.clock().now()) {
|
||||
//_log.debug("Selecting tunnel: " + info + " - " + _tunnels);
|
||||
_lastSelected = info;
|
||||
return info;
|
||||
}
|
||||
}
|
||||
@ -261,6 +285,10 @@ public class TunnelPool {
|
||||
if (_settings.isInbound() && (_settings.getDestination() != null) )
|
||||
ls = locked_buildNewLeaseSet();
|
||||
remaining = _tunnels.size();
|
||||
if (_lastSelected == info) {
|
||||
_lastSelected = null;
|
||||
_lastSelectionPeriod = 0;
|
||||
}
|
||||
}
|
||||
|
||||
_lifetimeProcessed += info.getProcessedMessagesCount();
|
||||
@ -297,6 +325,10 @@ public class TunnelPool {
|
||||
if (_settings.isInbound() && (_settings.getDestination() != null) )
|
||||
ls = locked_buildNewLeaseSet();
|
||||
remaining = _tunnels.size();
|
||||
if (_lastSelected == cfg) {
|
||||
_lastSelected = null;
|
||||
_lastSelectionPeriod = 0;
|
||||
}
|
||||
}
|
||||
|
||||
_lifetimeProcessed += cfg.getProcessedMessagesCount();
|
||||
|
Reference in New Issue
Block a user