2005-03-21 jrandom

* Fixed the tunnel fragmentation handler to deal with multiple fragments
      in a single message properly (rather than release the buffer into the
      cache after processing the first one) (duh!)
    * Added the batching preprocessor which will bundle together multiple
      small messages inside a single tunnel message by delaying their delivery
      up to .5s, or whenever the pending data will fill a full message,
      whichever comes first.  This is disabled at the moment, since without the
      above bugfix widely deployed, lots and lots of messages would fail.
    * Within each tunnel pool, stick with a randomly selected peer for up to
      .5s before randomizing and selecting again, instead of randomizing the
      pool each time a tunnel is needed.
This commit is contained in:
jrandom
2005-03-22 01:38:21 +00:00
committed by zzz
parent a2bd71c75b
commit 3f9bf28382
14 changed files with 775 additions and 209 deletions

View File

@ -1,4 +1,17 @@
$Id: history.txt,v 1.172 2005/03/18 03:48:00 jrandom Exp $
$Id: history.txt,v 1.173 2005/03/18 17:34:54 jrandom Exp $
2005-03-21 jrandom
* Fixed the tunnel fragmentation handler to deal with multiple fragments
in a single message properly (rather than release the buffer into the
cache after processing the first one) (duh!)
* Added the batching preprocessor which will bundle together multiple
small messages inside a single tunnel message by delaying their delivery
up to .5s, or whenever the pending data will fill a full message,
whichever comes first. This is disabled at the moment, since without the
above bugfix widely deployed, lots and lots of messages would fail.
* Within each tunnel pool, stick with a randomly selected peer for up to
.5s before randomizing and selecting again, instead of randomizing the
pool each time a tunnel is needed.
* 2005-03-18 0.5.0.3 released

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.166 $ $Date: 2005/03/18 03:48:01 $";
public final static String ID = "$Revision: 1.167 $ $Date: 2005/03/18 17:34:52 $";
public final static String VERSION = "0.5.0.3";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -112,6 +112,9 @@ public class StatisticsManager implements Service {
includeRate("tunnel.buildFailure", stats, new long[] { 60*60*1000 });
includeRate("tunnel.buildSuccess", stats, new long[] { 60*60*1000 });
includeRate("tunnel.batchDelaySent", stats, new long[] { 10*60*1000, 60*60*1000 });
includeRate("tunnel.batchMultipleCount", stats, new long[] { 10*60*1000, 60*60*1000 });
includeRate("router.throttleTunnelProbTestSlow", stats, new long[] { 60*60*1000 });
includeRate("router.throttleTunnelProbTooFast", stats, new long[] { 60*60*1000 });

View File

@ -0,0 +1,184 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DataMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.util.Log;
/**
* Test the batching behavior of the preprocessor with one, two, or three
* messages of various sizes and settings.
*
*/
public class BatchedFragmentTest extends FragmentTest {
public BatchedFragmentTest() {
super();
BatchedPreprocessor.DEFAULT_DELAY = 200;
}
protected TunnelGateway.QueuePreprocessor createPreprocessor(I2PAppContext ctx) {
return new BatchedPreprocessor(ctx);
}
/**
* Send a small message, wait a second, then send a large message, pushing
* the first one through immediately, with the rest of the large one passed
* after a brief delay.
*
*/
public void runBatched() {
TunnelGateway.Pending pending1 = createPending(10, false, false);
ArrayList messages = new ArrayList();
messages.add(pending1);
TunnelGateway.Pending pending2 = createPending(1024, false, false);
TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context);
SenderImpl sender = new SenderImpl();
DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending1.getData(), pending2.getData());
FragmentHandler handler = new FragmentHandler(_context, handleReceiver);
ReceiverImpl receiver = new ReceiverImpl(handler, 0);
byte msg[] = pending1.getData();
_log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64());
boolean keepGoing = true;
boolean alreadyAdded = false;
while (keepGoing) {
keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver);
if (keepGoing) {
try { Thread.sleep(150); } catch (InterruptedException ie) {}
if (!alreadyAdded) {
messages.add(pending2);
alreadyAdded = true;
}
}
}
if (handleReceiver.receivedOk())
_log.info("Receive batched ok");
else
_log.info("Failed to receive batched");
}
/**
* Send a small message, wait a second, then send a large message, pushing
* the first one through immediately, with the rest of the large one passed
* after a brief delay.
*
*/
public void runBatches() {
int success = 0;
//success += testBatched(1, false, false, 1024, false, false);
// this takes a long fucking time
for (int i = 1; i <= 1024; i++) {
success += testBatched(i, false, false, 1024, false, false, 1024, false, false);
success += testBatched(i, true, false, 1024, false, false, 1024, false, false);
success += testBatched(i, true, true, 1024, false, false, 1024, false, false);
success += testBatched(i, false, false, 1024, true, false, 1024, false, false);
success += testBatched(i, true, false, 1024, true, false, 1024, false, false);
success += testBatched(i, true, true, 1024, true, false, 1024, false, false);
success += testBatched(i, false, false, 1024, true, true, 1024, false, false);
success += testBatched(i, true, false, 1024, true, true, 1024, false, false);
success += testBatched(i, true, true, 1024, true, true, 1024, false, false);
success += testBatched(i, false, false, 1024, false, false, 1024, true, false);
success += testBatched(i, true, false, 1024, false, false, 1024, true, false);
success += testBatched(i, true, true, 1024, false, false, 1024, true, false);
success += testBatched(i, false, false, 1024, true, false, 1024, true, false);
success += testBatched(i, true, false, 1024, true, false, 1024, true, false);
success += testBatched(i, true, true, 1024, true, false, 1024, true, false);
success += testBatched(i, false, false, 1024, true, true, 1024, true, false);
success += testBatched(i, true, false, 1024, true, true, 1024, true, false);
success += testBatched(i, true, true, 1024, true, true, 1024, true, false);
success += testBatched(i, false, false, 1024, false, false, 1024, true, true);
success += testBatched(i, true, false, 1024, false, false, 1024, true, true);
success += testBatched(i, true, true, 1024, false, false, 1024, true, true);
success += testBatched(i, false, false, 1024, true, false, 1024, true, true);
success += testBatched(i, true, false, 1024, true, false, 1024, true, true);
success += testBatched(i, true, true, 1024, true, false, 1024, true, true);
success += testBatched(i, false, false, 1024, true, true, 1024, true, true);
success += testBatched(i, true, false, 1024, true, true, 1024, true, true);
success += testBatched(i, true, true, 1024, true, true, 1024, true, true);
}
_log.info("** Batches complete with " + success + " successful runs");
}
private int testBatched(int firstSize, boolean firstRouter, boolean firstTunnel,
int secondSize, boolean secondRouter, boolean secondTunnel,
int thirdSize, boolean thirdRouter, boolean thirdTunnel) {
TunnelGateway.Pending pending1 = createPending(firstSize, firstRouter, firstTunnel);
TunnelGateway.Pending pending2 = createPending(secondSize, secondRouter, secondTunnel);
TunnelGateway.Pending pending3 = createPending(thirdSize, thirdRouter, thirdTunnel);
boolean ok = runBatch(pending1, pending2, pending3);
if (ok) {
_log.info("OK: " + firstSize + "." + firstRouter + "." + firstTunnel
+ " " + secondSize + "." + secondRouter + "." + secondTunnel
+ " " + thirdSize + "." + thirdRouter + "." + thirdTunnel);
return 1;
} else {
_log.info("FAIL: " + firstSize + "." + firstRouter + "." + firstTunnel
+ " " + secondSize + "." + secondRouter + "." + secondTunnel
+ " " + thirdSize + "." + thirdRouter + "." + thirdTunnel);
return 0;
}
}
private boolean runBatch(TunnelGateway.Pending pending1, TunnelGateway.Pending pending2, TunnelGateway.Pending pending3) {
ArrayList messages = new ArrayList();
messages.add(pending1);
TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context);
SenderImpl sender = new SenderImpl();
DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending1.getData(), pending2.getData(), pending3.getData());
FragmentHandler handler = new FragmentHandler(_context, handleReceiver);
ReceiverImpl receiver = new ReceiverImpl(handler, 0);
byte msg[] = pending1.getData();
_log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64());
boolean keepGoing = true;
int added = 0;
while (keepGoing) {
keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver);
if ( (keepGoing) || ((messages.size() == 0) && (added < 2) ) ) {
try { Thread.sleep(150); } catch (InterruptedException ie) {}
if (added == 0) {
_log.debug("Adding pending2");
messages.add(pending2);
added++;
keepGoing = true;
} else if (added == 1) {
_log.debug("Adding pending3");
messages.add(pending3);
added++;
keepGoing = true;
}
}
}
return handleReceiver.receivedOk();
}
public void runTests() {
//super.runVaried();
//super.runTests();
//runBatched();
runBatches();
}
public static void main(String args[]) {
BatchedFragmentTest t = new BatchedFragmentTest();
t.runTests();
}
}

View File

@ -0,0 +1,193 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.Log;
/**
* Batching preprocessor that will briefly delay the sending of a message
* if it doesn't fill up a full tunnel message, in which case it queues up
* an additional flush task. This is a very simple threshold algorithm -
* as soon as there is enough data for a full tunnel message, it is sent. If
* after the delay there still isn't enough data, what is available is sent
* and padded.
*
*/
public class BatchedPreprocessor extends TrivialPreprocessor {
private Log _log;
private long _pendingSince;
public BatchedPreprocessor(I2PAppContext ctx) {
super(ctx);
_log = ctx.logManager().getLog(BatchedPreprocessor.class);
_pendingSince = 0;
ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
private static final int FULL_SIZE = PREPROCESSED_SIZE
- IV_SIZE
- 1 // 0x00 ending the padding
- 4; // 4 byte checksum
/* not final or private so the test code can adjust */
static long DEFAULT_DELAY = 500;
/** wait up to 2 seconds before sending a small message */
protected long getSendDelay() { return DEFAULT_DELAY; }
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.INFO))
_log.info("Preprocess queue with " + pending.size() + " to send");
if (getSendDelay() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("No batching, send all messages immediately");
while (pending.size() > 0) {
// loops because sends may be partial
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(0);
send(pending, 0, 0, sender, rec);
if (msg.getOffset() >= msg.getData().length)
pending.remove(0);
}
return false;
}
while (pending.size() > 0) {
int allocated = 0;
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i);
int instructionsSize = getInstructionsSize(msg);
instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
int curWanted = msg.getData().length - msg.getOffset() + instructionsSize;
allocated += curWanted;
if (allocated >= FULL_SIZE) {
if (allocated - curWanted + instructionsSize >= FULL_SIZE) {
// the instructions alone exceed the size, so we won't get any
// of the message into it. don't include it
i--;
msg = (TunnelGateway.Pending)pending.get(i);
allocated -= curWanted;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pushback of " + curWanted + " (message " + (i+1) + ")");
}
if (_pendingSince > 0)
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0);
_pendingSince = 0;
send(pending, 0, i, sender, rec);
if (_log.shouldLog(Log.INFO))
_log.info("Allocated=" + allocated + " so we sent " + (i+1)
+ " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")");
for (int j = 0; j < i; j++)
pending.remove(0);
if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too
pending.remove(0);
}
if (i > 0)
_context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0);
allocated = 0;
break;
}
}
if (allocated > 0) {
// after going through the entire pending list, we still don't
// have enough data to send a full message
if ( (_pendingSince > 0) && (_pendingSince + getSendDelay() <= _context.clock().now()) ) {
if (_log.shouldLog(Log.INFO))
_log.info("Passed through pending list, with " + allocated + "/" + pending.size()
+ " left to clean up, but we've waited, so flush");
// not even a full message, but we want to flush it anyway
if (pending.size() > 1)
_context.statManager().addRateData("tunnel.batchMultipleCount", pending.size(), 0);
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0);
send(pending, 0, pending.size()-1, sender, rec);
pending.clear();
_pendingSince = 0;
return false;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Passed through pending list, with " + allocated + "/"+ pending.size()
+ " left to clean up, but we've haven't waited, so don't flush (wait="
+ (_context.clock().now() - _pendingSince) + " / " + _pendingSince + ")");
_context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0);
if (_pendingSince <= 0)
_pendingSince = _context.clock().now();
// not yet time to send the delayed flush
return true;
}
} else {
// ok, we sent some, but haven't gone back for another
// pass yet. keep looping
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Sent everything on the list (pending=" + pending.size() + ")");
// sent everything from the pending list, no need to delayed flush
return false;
}
/**
* Preprocess the messages from the pending list, grouping items startAt
* through sendThrough (though only part of the last one may be fully
* sent), delivering them through the sender/receiver.
*
* @param startAt first index in pending to send (inclusive)
* @param sendThrough last index in pending to send (inclusive)
*/
protected void send(List pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending.size());
byte preprocessed[] = _dataCache.acquire().getData();
ByteArray ivBuf = _ivCache.acquire();
byte iv[] = ivBuf.getData(); // new byte[IV_SIZE];
_context.random().nextBytes(iv);
int offset = 0;
offset = writeFragments(pending, startAt, sendThrough, preprocessed, offset);
// preprocessed[0:offset] now contains the fragments from the pending,
// so we need to format, pad, and rearrange according to the spec to
// generate the final preprocessed data
preprocess(preprocessed, offset);
sender.sendPreprocessed(preprocessed, rec);
}
/**
* Write the fragments out of the pending list onto the target, updating
* each of the Pending message's offsets accordingly.
*
* @return new offset into the target for further bytes to be written
*/
private int writeFragments(List pending, int startAt, int sendThrough, byte target[], int offset) {
for (int i = startAt; i <= sendThrough; i++) {
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i);
int prevOffset = offset;
if (msg.getOffset() == 0) {
offset = writeFirstFragment(msg, target, offset);
if (_log.shouldLog(Log.DEBUG))
_log.debug("writing " + msg.getMessageId() + " fragment 0, ending at " + offset + " prev " + prevOffset
+ " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later");
} else {
offset = writeSubsequentFragment(msg, target, offset);
if (_log.shouldLog(Log.DEBUG))
_log.debug("writing " + msg.getMessageId() + " fragment " + (msg.getFragmentNumber()-1)
+ ", ending at " + offset + " prev " + prevOffset
+ " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later");
}
}
return offset;
}
}

View File

@ -0,0 +1,55 @@
package net.i2p.router.tunnel;
import java.util.Properties;
import net.i2p.router.RouterContext;
/**
* Honor the 'batchFrequency' tunnel pool setting or the 'router.batchFrequency'
* router config setting, and track fragmentation.
*
*/
public class BatchedRouterPreprocessor extends BatchedPreprocessor {
private RouterContext _routerContext;
private TunnelCreatorConfig _config;
/**
* How frequently should we flush non-full messages, in milliseconds
*/
public static final String PROP_BATCH_FREQUENCY = "batchFrequency";
public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency";
public static final int DEFAULT_BATCH_FREQUENCY = 0;
public BatchedRouterPreprocessor(RouterContext ctx) {
this(ctx, null);
}
public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
super(ctx);
_routerContext = ctx;
_config = cfg;
}
/** how long should we wait before flushing */
protected long getSendDelay() {
String freq = null;
if (_config != null) {
Properties opts = _config.getOptions();
if (opts != null)
freq = opts.getProperty(PROP_BATCH_FREQUENCY);
} else {
freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY);
}
if (freq != null) {
try {
return Integer.parseInt(freq);
} catch (NumberFormatException nfe) {
return DEFAULT_BATCH_FREQUENCY;
}
}
return DEFAULT_BATCH_FREQUENCY;
}
protected void notePreprocessing(long messageId, int numFragments) {
_routerContext.messageHistory().fragmentMessage(messageId, numFragments);
}
}

View File

@ -33,8 +33,9 @@ public class FragmentHandler {
private int _failed;
/** don't wait more than 60s to defragment the partial message */
private static final long MAX_DEFRAGMENT_TIME = 60*1000;
static long MAX_DEFRAGMENT_TIME = 60*1000;
private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) {
_context = context;
_log = context.logManager().getLog(FragmentHandler.class);
@ -62,6 +63,7 @@ public class FragmentHandler {
if (!ok) {
_log.error("Unable to verify preprocessed data (pre.length=" + preprocessed.length
+ " off=" +offset + " len=" + length, new Exception("failed"));
_cache.release(new ByteArray(preprocessed));
return;
}
offset += HopProcessor.IV_LENGTH; // skip the IV
@ -83,6 +85,11 @@ public class FragmentHandler {
if (_log.shouldLog(Log.ERROR))
_log.error("Corrupt fragment received: offset = " + offset, e);
throw e;
} finally {
// each of the FragmentedMessages populated make a copy out of the
// payload, which they release separately, so we can release
// immediately
_cache.release(new ByteArray(preprocessed));
}
}
@ -254,6 +261,9 @@ public class FragmentHandler {
}
offset += size;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling finished message " + msg.getMessageId() + " at offset " + offset);
return offset;
}

View File

@ -3,6 +3,7 @@ package net.i2p.router.tunnel;
import java.util.ArrayList;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DataMessage;
@ -15,12 +16,18 @@ import net.i2p.util.Log;
*
*/
public class FragmentTest {
private I2PAppContext _context;
private Log _log;
protected I2PAppContext _context;
protected Log _log;
public FragmentTest() {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(FragmentTest.class);
_log = _context.logManager().getLog(getClass());
_context.random().nextBoolean();
FragmentHandler.MAX_DEFRAGMENT_TIME = 10*1000;
}
protected TunnelGateway.QueuePreprocessor createPreprocessor(I2PAppContext ctx) {
return new TrivialPreprocessor(ctx);
}
/**
@ -28,23 +35,26 @@ public class FragmentTest {
*
*/
public void runSingle() {
DataMessage m = new DataMessage(_context);
byte data[] = new byte[949];
_context.random().nextBytes(data);
m.setData(data);
m.setUniqueId(42);
m.setMessageExpiration(_context.clock().now() + 60*1000);
TunnelGateway.Pending pending = createPending(949, false, false);
ArrayList messages = new ArrayList();
TunnelGateway.Pending pending = new TunnelGateway.Pending(m, null, null);
messages.add(pending);
TrivialPreprocessor pre = new TrivialPreprocessor(_context);
TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context);
SenderImpl sender = new SenderImpl();
FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(m));
DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending.getData());
FragmentHandler handler = new FragmentHandler(_context, handleReceiver);
ReceiverImpl receiver = new ReceiverImpl(handler, 0);
byte msg[] = m.toByteArray();
byte msg[] = pending.getData();
_log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64());
pre.preprocessQueue(messages, new SenderImpl(), receiver);
boolean keepGoing = true;
while (keepGoing) {
keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver);
if (keepGoing)
try { Thread.sleep(100); } catch (InterruptedException ie) {}
}
if (handleReceiver.receivedOk())
_log.info("received OK");
}
/**
@ -52,23 +62,26 @@ public class FragmentTest {
*
*/
public void runMultiple() {
DataMessage m = new DataMessage(_context);
byte data[] = new byte[2048];
_context.random().nextBytes(data);
m.setData(data);
m.setUniqueId(42);
m.setMessageExpiration(_context.clock().now() + 60*1000);
TunnelGateway.Pending pending = createPending(2048, false, false);
ArrayList messages = new ArrayList();
TunnelGateway.Pending pending = new TunnelGateway.Pending(m, null, null);
messages.add(pending);
TrivialPreprocessor pre = new TrivialPreprocessor(_context);
TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context);
SenderImpl sender = new SenderImpl();
FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(m));
DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending.getData());
FragmentHandler handler = new FragmentHandler(_context, handleReceiver);
ReceiverImpl receiver = new ReceiverImpl(handler, 0);
byte msg[] = m.toByteArray();
byte msg[] = pending.getData();
_log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64());
pre.preprocessQueue(messages, new SenderImpl(), receiver);
boolean keepGoing = true;
while (keepGoing) {
keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver);
if (keepGoing)
try { Thread.sleep(100); } catch (InterruptedException ie) {}
}
if (handleReceiver.receivedOk())
_log.info("received OK");
}
/**
@ -77,31 +90,88 @@ public class FragmentTest {
*
*/
public void runDelayed() {
DataMessage m = new DataMessage(_context);
byte data[] = new byte[2048];
_context.random().nextBytes(data);
m.setData(data);
m.setUniqueId(42);
m.setMessageExpiration(_context.clock().now() + 60*1000);
TunnelGateway.Pending pending = createPending(2048, false, false);
ArrayList messages = new ArrayList();
TunnelGateway.Pending pending = new TunnelGateway.Pending(m, null, null);
messages.add(pending);
TrivialPreprocessor pre = new TrivialPreprocessor(_context);
TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context);
SenderImpl sender = new SenderImpl();
FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(m));
ReceiverImpl receiver = new ReceiverImpl(handler, 21*1000);
byte msg[] = m.toByteArray();
FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(pending.getData()));
ReceiverImpl receiver = new ReceiverImpl(handler, 11*1000);
byte msg[] = pending.getData();
_log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64());
pre.preprocessQueue(messages, new SenderImpl(), receiver);
boolean keepGoing = true;
while (keepGoing) {
keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver);
if (keepGoing)
try { Thread.sleep(100); } catch (InterruptedException ie) {}
}
}
private class SenderImpl implements TunnelGateway.Sender {
public void runVaried() {
int failures = 0;
for (int i = 0; i <= 4096; i++) {
boolean ok = runVaried(i, false, false);
if (!ok) { _log.error("** processing " + i+ " w/ no router, no tunnel failed"); failures++; }
ok = runVaried(i, true, false);
if (!ok) { _log.error("** processing " + i+ " w/ router, no tunnel failed"); failures++; }
ok = runVaried(i, true, true);
if (!ok) { _log.error("** processing " + i+ " w/ router, tunnel failed"); failures++; }
else _log.info("Tests pass for size " + i);
}
if (failures == 0)
_log.info("** success after all varied tests");
else
_log.error("** failed " + failures +" varied tests");
}
protected boolean runVaried(int size, boolean includeRouter, boolean includeTunnel) {
TunnelGateway.Pending pending = createPending(size, includeRouter, includeTunnel);
ArrayList messages = new ArrayList();
messages.add(pending);
DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending.getData());
TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context);
SenderImpl sender = new SenderImpl();
FragmentHandler handler = new FragmentHandler(_context, handleReceiver);
ReceiverImpl receiver = new ReceiverImpl(handler, 0);
byte msg[] = pending.getData();
_log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64());
boolean keepGoing = true;
while (keepGoing) {
keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver);
if (keepGoing)
try { Thread.sleep(100); } catch (InterruptedException ie) {}
}
return handleReceiver.receivedOk();
}
protected TunnelGateway.Pending createPending(int size, boolean includeRouter, boolean includeTunnel) {
DataMessage m = new DataMessage(_context);
byte data[] = new byte[size];
_context.random().nextBytes(data);
m.setData(data);
m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
m.setMessageExpiration(_context.clock().now() + 60*1000);
Hash toRouter = null;
TunnelId toTunnel = null;
if (includeRouter) {
toRouter = new Hash(new byte[Hash.HASH_LENGTH]);
_context.random().nextBytes(toRouter.getData());
}
if (includeTunnel)
toTunnel = new TunnelId(_context.random().nextLong(TunnelId.MAX_ID_VALUE));
return new TunnelGateway.Pending(m, toRouter, toTunnel);
}
protected class SenderImpl implements TunnelGateway.Sender {
public void sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) {
receiver.receiveEncrypted(preprocessed);
}
}
private class ReceiverImpl implements TunnelGateway.Receiver {
protected class ReceiverImpl implements TunnelGateway.Receiver {
private FragmentHandler _handler;
private int _delay;
public ReceiverImpl(FragmentHandler handler, int delay) {
@ -114,21 +184,62 @@ public class FragmentTest {
}
}
private class DefragmentedReceiverImpl implements FragmentHandler.DefragmentedReceiver {
private I2NPMessage _expected;
public DefragmentedReceiverImpl(I2NPMessage expected) {
protected class DefragmentedReceiverImpl implements FragmentHandler.DefragmentedReceiver {
private byte _expected[];
private byte _expected2[];
private byte _expected3[];
private int _received;
public DefragmentedReceiverImpl(byte expected[]) {
this(expected, null);
}
public DefragmentedReceiverImpl(byte expected[], byte expected2[]) {
this(expected, expected2, null);
}
public DefragmentedReceiverImpl(byte expected[], byte expected2[], byte expected3[]) {
_expected = expected;
_expected2 = expected2;
_expected3 = expected3;
_received = 0;
}
public void receiveComplete(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
_log.debug("equal? " + _expected.equals(msg));
boolean ok = false;
byte m[] = msg.toByteArray();
if ( (_expected != null) && (DataHelper.eq(_expected, m)) )
ok = true;
if (!ok && (_expected2 != null) && (DataHelper.eq(_expected2, m)) )
ok = true;
if (!ok && (_expected3 != null) && (DataHelper.eq(_expected3, m)) )
ok = true;
if (ok)
_received++;
//_log.info("** equal? " + ok);
}
public boolean receivedOk() {
if ( (_expected != null) && (_expected2 != null) && (_expected3 != null) )
return _received == 3;
else if ( (_expected != null) && (_expected2 != null) )
return _received == 2;
else if ( (_expected != null) || (_expected2 != null) )
return _received == 1;
else
return _received == 0;
}
}
public void runTests() {
runVaried();
_log.info("\n===========================Begin runSingle()\n\n");
runSingle();
_log.info("\n===========================Begin runMultiple()\n\n");
runMultiple();
_log.info("\n===========================Begin runDelayed() (should have 3 errors)\n\n");
runDelayed();
_log.info("\n===========================After runDelayed()\n\n");
}
public static void main(String args[]) {
FragmentTest t = new FragmentTest();
t.runSingle();
t.runMultiple();
t.runDelayed();
t.runTests();
}
}

View File

@ -74,7 +74,10 @@ public class FragmentedMessage {
_log.debug("Receive message " + messageId + " fragment " + fragmentNum + " with " + length + " bytes (last? " + isLast + ") offset = " + offset);
_messageId = messageId;
// we should just use payload[] and use an offset/length on it
ByteArray ba = new ByteArray(payload, offset, length); //new byte[length]);
ByteArray ba = _cache.acquire(); //new ByteArray(payload, offset, length); //new byte[length]);
System.arraycopy(payload, offset, ba.getData(), 0, length);
ba.setValid(length);
ba.setOffset(0);
//System.arraycopy(payload, offset, ba.getData(), 0, length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("fragment[" + fragmentNum + "/" + offset + "/" + length + "]: "
@ -107,7 +110,10 @@ public class FragmentedMessage {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive message " + messageId + " with " + length + " bytes (last? " + isLast + ") targetting " + toRouter + " / " + toTunnel + " offset=" + offset);
_messageId = messageId;
ByteArray ba = new ByteArray(payload, offset, length); // new byte[length]);
ByteArray ba = _cache.acquire(); // new ByteArray(payload, offset, length); // new byte[length]);
System.arraycopy(payload, offset, ba.getData(), 0, length);
ba.setValid(length);
ba.setOffset(0);
//System.arraycopy(payload, offset, ba.getData(), 0, length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("fragment[0/" + offset + "/" + length + "]: "

View File

@ -20,19 +20,24 @@ import net.i2p.util.Log;
*
*/
public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private I2PAppContext _context;
protected I2PAppContext _context;
private Log _log;
static final int PREPROCESSED_SIZE = 1024;
private static final int IV_SIZE = HopProcessor.IV_LENGTH;
private static final ByteCache _dataCache = ByteCache.getInstance(512, PREPROCESSED_SIZE);
private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
protected static final int IV_SIZE = HopProcessor.IV_LENGTH;
protected static final ByteCache _dataCache = ByteCache.getInstance(512, PREPROCESSED_SIZE);
protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
public TrivialPreprocessor(I2PAppContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(TrivialPreprocessor.class);
}
/**
* Return true if there were messages remaining, and we should queue up
* a delayed flush to clear them
*
*/
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
while (pending.size() > 0) {
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.remove(0);
@ -75,13 +80,82 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
*
*/
private byte[] preprocessFragment(TunnelGateway.Pending msg) {
byte target[] = _dataCache.acquire().getData();
int offset = 0;
if (msg.getOffset() <= 0)
return preprocessFirstFragment(msg);
offset = writeFirstFragment(msg, target, offset);
else
return preprocessSubsequentFragment(msg);
offset = writeSubsequentFragment(msg, target, offset);
preprocess(target, offset);
return target;
}
/**
* Wrap the preprocessed fragments with the necessary padding / checksums
* to act as a tunnel message.
*
* @param fragmentLength fragments[0:fragmentLength] is used
*/
protected void preprocess(byte fragments[], int fragmentLength) {
ByteArray ivBuf = _ivCache.acquire();
byte iv[] = ivBuf.getData(); // new byte[IV_SIZE];
_context.random().nextBytes(iv);
SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(PREPROCESSED_SIZE);
// payload ready, now H(instructions+payload+IV)
System.arraycopy(iv, 0, fragments, fragmentLength, IV_SIZE);
Hash h = _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, cache);
//Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE);
//_log.debug("before shift: " + Base64.encode(target));
// now shiiiiiift
int distance = PREPROCESSED_SIZE - fragmentLength;
System.arraycopy(fragments, 0, fragments, distance, fragmentLength);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(msg.getMessageId() + ": fragments begin at " + distance + " (size="
// + payloadLength + " offset=" + offset +")");
java.util.Arrays.fill(fragments, 0, distance, (byte)0x0);
//_log.debug("after shift: " + Base64.encode(target));
int offset = 0;
System.arraycopy(iv, 0, fragments, offset, IV_SIZE);
offset += IV_SIZE;
System.arraycopy(h.getData(), 0, fragments, offset, 4);
offset += 4;
//_log.debug("before pad : " + Base64.encode(target));
_context.sha().cache().release(cache);
_ivCache.release(ivBuf);
// fits in a single message, so may be smaller than the full size
int numPadBytes = PREPROCESSED_SIZE // max
- IV_SIZE // hmm..
- 4 // 4 bytes of the SHA256
- 1 // the 0x00 after the padding
- fragmentLength; // the size of the fragments (instructions+payload)
//_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength);
int paddingRemaining = numPadBytes;
while (paddingRemaining > 0) {
byte b = (byte)(_context.random().nextInt() & 0xFF);
if (b != 0x00) {
fragments[offset] = b;
offset++;
paddingRemaining--;
}
}
fragments[offset] = 0x0; // no more padding
offset++;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Preprocessing beginning of the fragment instructions at " + offset);
}
/** is this a follw up byte? */
private static final byte MASK_IS_SUBSEQUENT = FragmentHandler.MASK_IS_SUBSEQUENT;
/** how should this be delivered? shift this 5 the right and get TYPE_* */
@ -92,24 +166,28 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED;
private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5);
private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
private byte[] preprocessFirstFragment(TunnelGateway.Pending msg) {
protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) {
boolean fragmented = false;
ByteArray ivBuf = _ivCache.acquire();
byte iv[] = ivBuf.getData(); // new byte[IV_SIZE];
_context.random().nextBytes(iv);
byte target[] = _dataCache.acquire().getData(); //new byte[PREPROCESSED_SIZE];
int origOffset = offset;
int instructionsLength = getInstructionsSize(msg);
int payloadLength = msg.getData().length;
if (payloadLength + instructionsLength + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsLength + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
fragmented = true;
instructionsLength += 4; // messageId
payloadLength = PREPROCESSED_SIZE - IV_SIZE - 1 - 4 - instructionsLength;
payloadLength = PREPROCESSED_SIZE - IV_SIZE - 1 - 4 - instructionsLength - offset;
if (payloadLength <= 0)
throw new RuntimeException("Fragment too small! payloadLen=" + payloadLength
+ " target.length=" + target.length + " offset="+offset
+ " msg.length=" + msg.getData().length + " msg.offset=" + msg.getOffset()
+ " instructionsLength=" + instructionsLength + " for " + msg);
}
int offset = 0;
if (payloadLength <= 0)
throw new RuntimeException("Full size too small! payloadLen=" + payloadLength
+ " target.length=" + target.length + " offset="+offset
+ " msg.length=" + msg.getData().length + " msg.offset=" + msg.getOffset()
+ " instructionsLength=" + instructionsLength + " for " + msg);
// first fragment, or full message
target[offset] = 0x0;
@ -142,89 +220,21 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
DataHelper.toLong(target, offset, 2, payloadLength);
offset += 2;
//_log.debug("raw data : " + Base64.encode(msg.getData()));
System.arraycopy(msg.getData(), 0, target, offset, payloadLength);
System.arraycopy(msg.getData(), msg.getOffset(), target, offset, payloadLength);
if (_log.shouldLog(Log.DEBUG))
_log.debug("initial fragment[" + msg.getMessageId() + "/" + msg.getFragmentNumber()+ "/"
+ (PREPROCESSED_SIZE - offset - payloadLength) + "/" + payloadLength + "]: "
+ Base64.encode(target, offset, payloadLength));
offset += payloadLength;
SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(PREPROCESSED_SIZE);
// payload ready, now H(instructions+payload+IV)
System.arraycopy(iv, 0, target, offset, IV_SIZE);
Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE, cache);
//Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE);
//_log.debug("before shift: " + Base64.encode(target));
// now shiiiiiift
int distance = PREPROCESSED_SIZE - offset;
System.arraycopy(target, 0, target, distance, offset);
if (_log.shouldLog(Log.DEBUG))
_log.debug(msg.getMessageId() + ": fragments begin at " + distance + " (size="
+ payloadLength + " offset=" + offset +")");
java.util.Arrays.fill(target, 0, distance, (byte)0x0);
//_log.debug("after shift: " + Base64.encode(target));
offset = 0;
System.arraycopy(iv, 0, target, offset, IV_SIZE);
offset += IV_SIZE;
System.arraycopy(h.getData(), 0, target, offset, 4);
offset += 4;
//_log.debug("before pad : " + Base64.encode(target));
_context.sha().cache().release(cache);
_ivCache.release(ivBuf);
if (!fragmented) {
// fits in a single message, so may be smaller than the full size
int numPadBytes = PREPROCESSED_SIZE // max
- IV_SIZE // hmm..
- 4 // 4 bytes of the SHA256
- 1 // the 0x00 after the padding
- payloadLength // the, er, payload
- instructionsLength; // wanna guess?
//_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength);
int paddingRemaining = numPadBytes;
while (paddingRemaining > 0) {
byte b = (byte)(_context.random().nextInt() & 0xFF);
if (b != 0x00) {
target[offset] = b;
offset++;
paddingRemaining--;
}
/*
long rnd = _context.random().nextLong();
for (long i = 0; i < 8; i++) {
byte b = (byte)(((rnd >>> i * 8l) & 0xFF));
if (b == 0x00)
continue;
target[offset] = b;
offset++;
paddingRemaining--;
}
*/
}
}
target[offset] = 0x0; // no padding here
offset++;
msg.setOffset(payloadLength);
msg.setOffset(msg.getOffset() + payloadLength);
msg.incrementFragmentNumber();
return target;
return offset;
}
private byte[] preprocessSubsequentFragment(TunnelGateway.Pending msg) {
ByteArray ivBuf = _ivCache.acquire();
protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) {
boolean isLast = true;
byte iv[] = ivBuf.getData(); // new byte[IV_SIZE];
_context.random().nextBytes(iv);
byte target[] = _dataCache.acquire().getData(); // new byte[PREPROCESSED_SIZE];
int instructionsLength = getInstructionsSize(msg);
int payloadLength = msg.getData().length - msg.getOffset();
@ -233,8 +243,6 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
payloadLength = PREPROCESSED_SIZE - IV_SIZE - 1 - 4 - instructionsLength;
}
int offset = 0;
// first fragment, or full message
target[offset] = 0x0;
target[offset] |= MASK_IS_SUBSEQUENT;
@ -259,63 +267,13 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
+ Base64.encode(target, offset, payloadLength));
offset += payloadLength;
SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(PREPROCESSED_SIZE);
// payload ready, now H(instructions+payload+IV)
System.arraycopy(iv, 0, target, offset, IV_SIZE);
Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE, cache);
//Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE);
// now shiiiiiift
int distance = PREPROCESSED_SIZE - offset;
System.arraycopy(target, 0, target, distance, offset);
if (_log.shouldLog(Log.DEBUG))
_log.debug(msg.getMessageId() + ": fragments begin at " + distance + " (size="
+ payloadLength + " offset=" + offset +")");
offset = 0;
System.arraycopy(iv, 0, target, 0, IV_SIZE);
offset += IV_SIZE;
_ivCache.release(ivBuf);
System.arraycopy(h.getData(), 0, target, offset, 4);
offset += 4;
_context.sha().cache().release(cache);
if (isLast) {
// this is the last message, so may be smaller than the full size
int numPadBytes = PREPROCESSED_SIZE // max
- IV_SIZE // hmm..
- 4 // 4 bytes of the SHA256
- 1 // the 0x00 after the padding
- payloadLength // the, er, payload
- instructionsLength; // wanna guess?
for (int i = 0; i < numPadBytes; i++) {
// wouldn't it be nice if random could write to an array?
byte rnd = (byte)_context.random().nextInt();
if (rnd != 0x0) {
target[offset] = rnd;
offset++;
} else {
i--;
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("# pad bytes: " + numPadBytes);
}
target[offset] = 0x0; // end of padding
offset++;
msg.setOffset(msg.getOffset() + payloadLength);
msg.incrementFragmentNumber();
return target;
msg.setOffset(msg.getOffset() + payloadLength);
return offset;
}
private int getInstructionsSize(TunnelGateway.Pending msg) {
protected int getInstructionsSize(TunnelGateway.Pending msg) {
if (msg.getFragmentNumber() > 0)
return 7;
int header = 1;
@ -324,7 +282,16 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
if (msg.getToRouter() != null)
header += 32;
header += 2;
return header;
}
protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
// requires fragmentation, so include the messageId
return 4;
}
return 0;
}
}

View File

@ -2,6 +2,7 @@ package net.i2p.router.tunnel;
import java.util.Date;
import java.util.Locale;
import java.util.Properties;
import java.text.SimpleDateFormat;
import net.i2p.data.Base64;
@ -44,6 +45,8 @@ public class TunnelCreatorConfig implements TunnelInfo {
/** how many hops are there in the tunnel? */
public int getLength() { return _config.length; }
public Properties getOptions() { return null; }
/**
* retrieve the config for the given hop. the gateway is
* hop 0.

View File

@ -106,6 +106,16 @@ public class TunnelDispatcher implements Service {
"How many messages are sent through a participating tunnel?", "Tunnels",
new long[] { 60*10*1000l, 60*60*1000l, 24*60*60*1000l });
}
private TunnelGateway.QueuePreprocessor createPreprocessor() {
return createPreprocessor(null);
}
private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) {
if (true)
return new BatchedRouterPreprocessor(_context, cfg);
else
return new TrivialRouterPreprocessor(_context);
}
/**
* We are the outbound gateway - we created this tunnel
@ -114,7 +124,7 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.INFO))
_log.info("Outbound built successfully: " + cfg);
if (cfg.getLength() > 1) {
TunnelGateway.QueuePreprocessor preproc = new TrivialRouterPreprocessor(_context);
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
TunnelGateway.Sender sender = new OutboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new OutboundReceiver(_context, cfg);
TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
@ -211,7 +221,7 @@ public class TunnelDispatcher implements Service {
public void joinInboundGateway(HopConfig cfg) {
if (_log.shouldLog(Log.INFO))
_log.info("Joining as inbound gateway: " + cfg);
TunnelGateway.QueuePreprocessor preproc = new TrivialRouterPreprocessor(_context);
TunnelGateway.QueuePreprocessor preproc = createPreprocessor();
TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);

View File

@ -97,6 +97,8 @@ public class TunnelGateway {
for (int i = 0; i < _queue.size(); i++) {
Pending m = (Pending)_queue.get(i);
if (m.getExpiration() < _lastFlush) {
if (_log.shouldLog(Log.ERROR))
_log.error("Expire on the queue: " + m);
_queue.remove(i);
i--;
}
@ -175,13 +177,16 @@ public class TunnelGateway {
private class DelayedFlush implements SimpleTimer.TimedEvent {
public void timeReached() {
long now = _context.clock().now();
boolean wantRequeue = false;
synchronized (_queue) {
if ( (_queue.size() > 0) && (_lastFlush + _flushFrequency < now) ) {
_preprocessor.preprocessQueue(_queue, _sender, _receiver);
_lastFlush = _context.clock().now();
}
if (_queue.size() > 0)
wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
}
if (wantRequeue)
SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency);
else
_lastFlush = _context.clock().now();
}
}
}

View File

@ -1,5 +1,6 @@
package net.i2p.router.tunnel.pool;
import java.util.Properties;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.router.tunnel.TunnelCreatorConfig;
@ -34,6 +35,11 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
}
}
public Properties getOptions() {
if (_pool == null) return null;
return _pool.getSettings().getUnknownOptions();
}
/**
* The tunnel failed, so stop using it
*/