2004-12-13 jrandom

* Added some error checking on the new client send job (thanks duck!)
    * Implemented tunnel rejection based on bandwidth usage (rejecting tunnels
      proportional to the bytes allocated in existing tunnels vs the bytes
      allowed through the bandwidth limiter).
    * Enable a new configuration parameter for triggering a tunnel rebuild
      (tunnel.maxTunnelFailures), where that is the max allowed test failures
      before killing the tunnel (default 0).
    * Gather more data that we rank capacity by (now we monitor and balance the
      data from 10m/30m/60m/1d instead of just 10m/60m/1d).
    * Fix a truncation/type conversion problem on the long term capacity
      values (we were ignoring the daily stats outright)
This commit is contained in:
jrandom
2004-12-13 13:45:52 +00:00
committed by zzz
parent 83c6eac017
commit 9e16bc203a
7 changed files with 167 additions and 25 deletions

View File

@ -1,4 +1,17 @@
$Id: history.txt,v 1.103 2004/12/11 02:05:12 jrandom Exp $
$Id: history.txt,v 1.104 2004/12/11 04:26:24 jrandom Exp $
2004-12-13 jrandom
* Added some error checking on the new client send job (thanks duck!)
* Implemented tunnel rejection based on bandwidth usage (rejecting tunnels
proportional to the bytes allocated in existing tunnels vs the bytes
allowed through the bandwidth limiter).
* Enable a new configuration parameter for triggering a tunnel rebuild
(tunnel.maxTunnelFailures), where that is the max allowed test failures
before killing the tunnel (default 0).
* Gather more data that we rank capacity by (now we monitor and balance the
data from 10m/30m/60m/1d instead of just 10m/60m/1d).
* Fix a truncation/type conversion problem on the long term capacity
values (we were ignoring the daily stats outright)
2004-12-11 jrandom
* Fix the missing HTTP timeout, which was caused by the deferred syn used

View File

@ -30,6 +30,8 @@ class RouterThrottleImpl implements RouterThrottle {
private static int THROTTLE_EVENT_LIMIT = 300;
private static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels";
private static final String PROP_DEFAULT_KBPS_THROTTLE = "router.defaultKBpsThrottle";
private static final String PROP_BANDWIDTH_SHARE_PERCENTAGE = "router.sharePercentage";
public RouterThrottleImpl(RouterContext context) {
_context = context;
@ -43,6 +45,7 @@ class RouterThrottleImpl implements RouterThrottle {
_context.statManager().createRateStat("router.throttleTunnelMaxExceeded", "How many tunnels we are participating in when we refuse one due to excees?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("router.throttleTunnelProbTooFast", "How many tunnels beyond the previous 1h average are we participating in when we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("router.throttleTunnelProbTestSlow", "How slow are our tunnel tests when our average exceeds the old average and we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("router.throttleTunnelBandwidthExceeded", "How much bandwidth is allocated when we refuse due to bandwidth allocation?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public boolean acceptNetworkMessage() {
@ -207,9 +210,12 @@ class RouterThrottleImpl implements RouterThrottle {
}
}
if (!allowTunnel(bytesAllocated, numTunnels)) {
_context.statManager().addRateData("router.throttleTunnelBandwidthExceeded", (long)bytesAllocated, 0);
return false;
}
_context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000);
// todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients,
// and check to see that they are less than the bandwidth limits
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels
@ -217,6 +223,99 @@ class RouterThrottleImpl implements RouterThrottle {
return true;
}
/**
* with bytesAllocated already accounted for across the numTunnels existing
* tunnels we have agreed to, can we handle another tunnel with our existing
* bandwidth?
*
*/
private boolean allowTunnel(double bytesAllocated, int numTunnels) {
long bytesAllowed = getBytesAllowed();
bytesAllowed *= getSharePercentage();
double bytesPerTunnel = (numTunnels > 0 ? bytesAllocated / numTunnels : 0);
double toAllocate = (numTunnels > 0 ? bytesPerTunnel * (numTunnels + 1) : 0);
double pctFull = toAllocate / bytesAllowed;
double allocatedKBps = toAllocate / (10 * 60 * 1024);
if (_context.random().nextInt(100) > 100 * pctFull) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Probabalistically allowing the tunnel w/ " + pctFull + " of our " + bytesAllowed
+ "bytes/" + allocatedKBps + "KBps allocated through " + numTunnels + " tunnels");
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Rejecting the tunnel w/ " + pctFull + " of our " + bytesAllowed
+ "bytes allowed (" + toAllocate + "bytes / " + allocatedKBps
+ "KBps) through " + numTunnels + " tunnels");
return false;
}
}
/**
* What fraction of the bandwidth specified in our bandwidth limits should
* we allow to be consumed by participating tunnels?
*
*/
private double getSharePercentage() {
String pct = _context.getProperty(PROP_BANDWIDTH_SHARE_PERCENTAGE, "0.8");
if (pct != null) {
try {
return Double.parseDouble(pct);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.INFO))
_log.info("Unable to get the share percentage");
}
}
return 0.8;
}
/**
* BytesPerSecond that we can pass along data
*/
private long getBytesAllowed() {
String kbpsOutStr = _context.getProperty("i2np.bandwidth.outboundKBytesPerSecond");
long kbpsOut = -1;
if (kbpsOutStr != null) {
try {
kbpsOut = Integer.parseInt(kbpsOutStr);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.INFO))
_log.info("Unable to get the bytes allowed (outbound)");
}
}
String kbpsInStr = _context.getProperty("i2np.bandwidth.inboundKBytesPerSecond");
long kbpsIn = -1;
if (kbpsInStr != null) {
try {
kbpsIn = Integer.parseInt(kbpsInStr);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.INFO))
_log.info("Unable to get the bytes allowed (inbound)");
}
}
// whats our choke?
long kbps = (kbpsOut > kbpsIn ? kbpsIn : kbpsOut);
if (kbps <= 0) {
try {
kbps = Integer.parseInt(_context.getProperty(PROP_DEFAULT_KBPS_THROTTLE, "64")); // absurd
} catch (NumberFormatException nfe) {
kbps = 64;
}
}
return kbps
* 60 // per minute
* 10 // per 10 minute period
* 1024; // bytes;
}
/** dont ever probabalistically throttle tunnels if we have less than this many */
private int getMinThrottleTunnels() {
try {

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.108 $ $Date: 2004/12/11 02:05:13 $";
public final static String ID = "$Revision: 1.109 $ $Date: 2004/12/11 04:26:24 $";
public final static String VERSION = "0.4.2.3";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -53,6 +53,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private MessageId _clientMessageId;
private int _clientMessageSize;
private Destination _from;
private Destination _to;
/** target destination's leaseSet, if known */
private LeaseSet _leaseSet;
/** Actual lease the message is being routed through */
@ -60,6 +61,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private PayloadGarlicConfig _clove;
private long _cloveId;
private long _start;
private boolean _finished;
/**
* final timeout (in milliseconds) that the outbound message will fail in.
@ -114,6 +116,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_clientMessageId = msg.getMessageId();
_clientMessageSize = msg.getPayload().getSize();
_from = msg.getFromDestination();
_to = msg.getDestination();
String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
if (param == null)
@ -132,6 +135,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_start = getContext().clock().now();
_overallExpiration = timeoutMs + _start;
_shouldBundle = getShouldBundle();
_finished = false;
}
public String getName() { return "Outbound client message"; }
@ -142,11 +146,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
buildClove();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Clove built");
Hash to = _clientMessage.getDestination().calculateHash();
long timeoutMs = _overallExpiration - getContext().clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job");
getContext().netDb().lookupLeaseSet(to, new SendJob(), new LookupLeaseSetFailedJob(), timeoutMs);
getContext().netDb().lookupLeaseSet(_to.calculateHash(), new SendJob(), new LookupLeaseSetFailedJob(), timeoutMs);
}
private boolean getShouldBundle() {
@ -194,7 +197,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*
*/
private boolean getNextLease() {
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_clientMessage.getDestination().calculateHash());
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
if (_leaseSet == null) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet");
@ -278,7 +281,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
_overallExpiration, key,
_clove,
_clientMessage.getDestination(),
_to,
sessKey, tags,
true, replyLeaseSet);
@ -338,6 +341,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* this is safe to call multiple times (only tells the client once)
*/
private void dieFatal() {
if (_finished) return;
_finished = true;
long sendTime = getContext().clock().now() - _start;
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Failed to send the message " + _clientMessageId + " after "
@ -364,7 +370,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
DeliveryInstructions instructions = new DeliveryInstructions();
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_DESTINATION);
instructions.setDestination(_clientMessage.getDestination().calculateHash());
instructions.setDestination(_to.calculateHash());
instructions.setDelayRequested(false);
instructions.setDelaySeconds(0);
@ -434,6 +440,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
public String getName() { return "Send client message successful to a lease"; }
public void runJob() {
if (_finished) return;
_finished = true;
long sendTime = getContext().clock().now() - _start;
if (_log.shouldLog(Log.INFO))
_log.info(OutboundClientMessageOneShotJob.this.getJobId()

View File

@ -45,16 +45,23 @@ public class CapacityCalculator extends Calculator {
private static long ESTIMATE_PERIOD = 60*60*1000;
public double calc(PeerProfile profile) {
double capacity = 0;
RateStat acceptStat = profile.getTunnelCreateResponseTime();
RateStat rejectStat = profile.getTunnelHistory().getRejectionRate();
RateStat failedStat = profile.getTunnelHistory().getFailedRate();
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 10*60*1000);
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 30*60*1000);
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 60*60*1000);
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 24*60*60*1000);
double capacity10m = estimateCapacity(acceptStat, rejectStat, failedStat, 10*60*1000);
double capacity30m = estimateCapacity(acceptStat, rejectStat, failedStat, 30*60*1000);
double capacity60m = estimateCapacity(acceptStat, rejectStat, failedStat, 60*60*1000);
double capacity1d = estimateCapacity(acceptStat, rejectStat, failedStat, 24*60*60*1000);
double capacity = capacity10m * periodWeight(10*60*1000) +
capacity30m * periodWeight(30*60*1000) +
capacity60m * periodWeight(60*60*1000) +
capacity1d * periodWeight(24*60*60*1000);
// if we actively know they're bad, who cares if they used to be good?
if (capacity10m <= 0)
capacity = 0;
if (tooOld(profile))
capacity = 1;
@ -74,7 +81,7 @@ public class CapacityCalculator extends Calculator {
return true;
}
private double estimatePartial(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) {
private double estimateCapacity(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) {
Rate curAccepted = acceptStat.getRate(period);
Rate curRejected = rejectStat.getRate(period);
Rate curFailed = failedStat.getRate(period);
@ -82,25 +89,26 @@ public class CapacityCalculator extends Calculator {
long eventCount = 0;
if (curAccepted != null)
eventCount = curAccepted.getCurrentEventCount() + curAccepted.getLastEventCount();
double stretch = ESTIMATE_PERIOD / period;
double stretch = ((double)ESTIMATE_PERIOD) / period;
double val = eventCount * stretch;
long failed = 0;
if (curFailed != null)
failed = curFailed.getCurrentEventCount() + curFailed.getLastEventCount();
if (failed > 0) {
if ( (period == 10*60*1000) && (curFailed.getCurrentEventCount() > 0) )
if ( (period <= 10*60*1000) && (curFailed.getCurrentEventCount() > 0) )
return 0.0d; // their tunnels have failed in the last 0-10 minutes
else
val -= failed * stretch;
}
if ( (period == 10*60*1000) && (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0) )
if ( (period <= 10*60*1000) && (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0) ) {
//System.out.println("10m period has rejected " + (curRejected.getCurrentEventCount() + curRejected.getLastEventCount()) + " times");
return 0.0d;
else
} else
val -= stretch * (curRejected.getCurrentEventCount() + curRejected.getLastEventCount());
if (val >= 0) {
return (val + GROWTH_FACTOR) * periodWeight(period);
return (val + GROWTH_FACTOR);
} else {
// failed too much, don't grow
return 0.0d;

View File

@ -248,7 +248,7 @@ public class PeerProfile {
if (_dbResponseTime == null)
_dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelCreateResponseTime == null)
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_commError == null)

View File

@ -529,6 +529,20 @@ class TunnelPool {
}
private static final int MAX_FAILURES_PER_TUNNEL = 0;
public static final String PROP_MAX_TUNNEL_FAILURES = "tunnel.maxTunnelFailures";
private int getMaxTunnelFailures() {
String max = _context.getProperty(PROP_MAX_TUNNEL_FAILURES);
if (max != null) {
try {
return Integer.parseInt(max);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Max tunnel failures property is invalid [" + max + "]");
}
}
return MAX_FAILURES_PER_TUNNEL;
}
public void tunnelFailed(TunnelId id) {
if (!_isLive) return;
@ -536,7 +550,7 @@ class TunnelPool {
if (info == null)
return;
int failures = info.incrementFailures();
if (failures <= MAX_FAILURES_PER_TUNNEL) {
if (failures <= getMaxTunnelFailures()) {
if (_log.shouldLog(Log.INFO))
_log.info("Tunnel " + id + " failure " + failures + ", but not fatal yet");
return;