2005-07-11 jrandom

* Reduced the growth factor on the slow start and congestion avoidance for
      the streaming lib.
    * Adjusted some of the I2PTunnelServer threading to use a small pool of
      handlers, rather than launching off new threads which then immediately
      launch off an I2PTunnelRunner instance (which launches 3 more threads..)
    * Don't persist session keys / session tags (not worth it, for now)
    * Added some detection and handling code for duplicate session tags being
      delivered (root cause still not addressed)
    * Make the PRNG's buffer size configurable (via the config property
      "i2p.prng.totalBufferSizeKB=4096")
    * Disable SSU flooding by default (duh)
    * Updates to the StreamSink apps for better throttling tests.
This commit is contained in:
jrandom
2005-07-11 23:06:23 +00:00
committed by zzz
parent 51c492b842
commit 9d5f16a889
21 changed files with 317 additions and 146 deletions

View File

@ -160,45 +160,59 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
} }
public void run() { public void run() {
try {
I2PServerSocket i2pss = sockMgr.getServerSocket(); I2PServerSocket i2pss = sockMgr.getServerSocket();
for (int i = 0; i < 5; i++) {
I2PThread handler = new I2PThread(new Handler(i2pss), "Handle Server " + i);
handler.start();
}
/*
while (true) { while (true) {
I2PSocket i2ps = i2pss.accept(); I2PSocket i2ps = i2pss.accept();
if (i2ps == null) throw new I2PException("I2PServerSocket closed"); if (i2ps == null) throw new I2PException("I2PServerSocket closed");
I2PThread t = new I2PThread(new Handler(i2ps)); I2PThread t = new I2PThread(new Handler(i2ps));
t.start(); t.start();
} }
} catch (I2PException ex) { */
_log.error("Error while waiting for I2PConnections", ex);
} catch (IOException ex) {
_log.error("Error while waiting for I2PConnections", ex);
}
} }
/** /**
* Async handler to keep .accept() from blocking too long. * minor thread pool to pull off the accept() concurrently. there are still lots
* todo: replace with a thread pool so we dont get overrun by threads if/when * (and lots) of wasted threads within the I2PTunnelRunner, but its a start
* receiving a lot of connection requests concurrently.
* *
*/ */
private class Handler implements Runnable { private class Handler implements Runnable {
private I2PSocket _handleSocket; private I2PServerSocket _serverSocket;
public Handler(I2PSocket socket) { public Handler(I2PServerSocket serverSocket) {
_handleSocket = socket; _serverSocket = serverSocket;
} }
public void run() { public void run() {
while (open) {
try {
handle(_serverSocket.accept());
} catch (I2PException ex) {
_log.error("Error while waiting for I2PConnections", ex);
return;
} catch (IOException ex) {
_log.error("Error while waiting for I2PConnections", ex);
return;
}
}
}
private void handle(I2PSocket socket) {
long afterAccept = I2PAppContext.getGlobalContext().clock().now(); long afterAccept = I2PAppContext.getGlobalContext().clock().now();
long afterSocket = -1; long afterSocket = -1;
//local is fast, so synchronously. Does not need that many //local is fast, so synchronously. Does not need that many
//threads. //threads.
try { try {
_handleSocket.setReadTimeout(readTimeout); socket.setReadTimeout(readTimeout);
Socket s = new Socket(remoteHost, remotePort); Socket s = new Socket(remoteHost, remotePort);
afterSocket = I2PAppContext.getGlobalContext().clock().now(); afterSocket = I2PAppContext.getGlobalContext().clock().now();
new I2PTunnelRunner(s, _handleSocket, slock, null, null); new I2PTunnelRunner(s, socket, slock, null, null);
} catch (SocketException ex) { } catch (SocketException ex) {
try { try {
_handleSocket.close(); socket.close();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error while closing the received i2p con", ex); _log.error("Error while closing the received i2p con", ex);
} }

View File

@ -15,6 +15,7 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -75,36 +76,38 @@ public class StreamSinkClient {
if (fis == null) try { fis.close(); } catch (IOException ioe) {} if (fis == null) try { fis.close(); } catch (IOException ioe) {}
} }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64());
System.out.println("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64()); while (true) {
try { try {
I2PSocket sock = mgr.connect(peer); I2PSocket sock = mgr.connect(peer);
byte buf[] = new byte[32*1024]; byte buf[] = new byte[Math.min(32*1024, _sendSize*1024)];
Random rand = new Random(); Random rand = new Random();
OutputStream out = sock.getOutputStream(); OutputStream out = sock.getOutputStream();
long beforeSending = System.currentTimeMillis(); long beforeSending = System.currentTimeMillis();
for (int i = 0; (_sendSize < 0) || (i < _sendSize); i+= 32) { for (int i = 0; (_sendSize < 0) || (i < _sendSize); i+= buf.length/1024) {
rand.nextBytes(buf); rand.nextBytes(buf);
out.write(buf); out.write(buf);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Wrote " + (i+32) + "/" + _sendSize + "KB"); _log.debug("Wrote " + ((1+i*buf.length)/1024) + "/" + _sendSize + "KB");
if (_writeDelay > 0) { if (_writeDelay > 0) {
try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {} try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {}
} }
} }
sock.close(); sock.close();
long afterSending = System.currentTimeMillis(); long afterSending = System.currentTimeMillis();
System.out.println("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms");
} catch (InterruptedIOException iie) { } catch (InterruptedIOException iie) {
_log.error("Timeout connecting to the peer", iie); _log.error("Timeout connecting to the peer", iie);
return; //return;
} catch (NoRouteToHostException nrthe) { } catch (NoRouteToHostException nrthe) {
_log.error("Unable to connect to the peer", nrthe); _log.error("Unable to connect to the peer", nrthe);
return; //return;
} catch (ConnectException ce) { } catch (ConnectException ce) {
_log.error("Connection already dropped", ce); _log.error("Connection already dropped", ce);
return; //return;
} catch (I2PException ie) { } catch (I2PException ie) {
_log.error("Error connecting to the peer", ie); _log.error("Error connecting to the peer", ie);
return; return;
@ -113,22 +116,26 @@ public class StreamSinkClient {
return; return;
} }
} }
}
/** /**
* Fire up the client. <code>Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile</code> <br /> * Fire up the client. <code>Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile [concurrentSends]</code> <br />
* <ul> * <ul>
* <li><b>sendSizeKB</b>: how many KB to send, or -1 for unlimited</li> * <li><b>sendSizeKB</b>: how many KB to send, or -1 for unlimited</li>
* <li><b>writeDelayMs</b>: how long to wait between each .write (0 for no delay)</li> * <li><b>writeDelayMs</b>: how long to wait between each .write (0 for no delay)</li>
* <li><b>serverDestFile</b>: file containing the StreamSinkServer's binary Destination</li> * <li><b>serverDestFile</b>: file containing the StreamSinkServer's binary Destination</li>
* <li><b>concurrentSends</b>: how many concurrent threads should send to the server at once</li>
* </ul> * </ul>
*/ */
public static void main(String args[]) { public static void main(String args[]) {
StreamSinkClient client = null; StreamSinkClient client = null;
int sendSizeKB = -1; int sendSizeKB = -1;
int writeDelayMs = -1; int writeDelayMs = -1;
int concurrent = 1;
switch (args.length) { switch (args.length) {
case 3: case 3: // fall through
case 4:
try { try {
sendSizeKB = Integer.parseInt(args[0]); sendSizeKB = Integer.parseInt(args[0]);
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
@ -141,9 +148,13 @@ public class StreamSinkClient {
System.err.println("Write delay ms invalid [" + args[1] + "]"); System.err.println("Write delay ms invalid [" + args[1] + "]");
return; return;
} }
if (args.length == 4) {
try { concurrent = Integer.parseInt(args[3]); } catch (NumberFormatException nfe) {}
}
client = new StreamSinkClient(sendSizeKB, writeDelayMs, args[2]); client = new StreamSinkClient(sendSizeKB, writeDelayMs, args[2]);
break; break;
case 5: case 5: // fall through
case 6:
try { try {
int port = Integer.parseInt(args[1]); int port = Integer.parseInt(args[1]);
sendSizeKB = Integer.parseInt(args[2]); sendSizeKB = Integer.parseInt(args[2]);
@ -152,11 +163,26 @@ public class StreamSinkClient {
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
System.err.println("arg error"); System.err.println("arg error");
} }
if (args.length == 6) {
try { concurrent = Integer.parseInt(args[5]); } catch (NumberFormatException nfe) {}
}
break; break;
default: default:
System.out.println("Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile"); System.out.println("Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile [concurrentSends]");
}
if (client != null) {
for (int i = 0; i < concurrent; i++)
new I2PThread(new Runner(client), "Client " + i).start();
}
}
private static class Runner implements Runnable {
private StreamSinkClient _client;
public Runner(StreamSinkClient client) {
_client = client;
}
public void run() {
_client.runClient();
} }
if (client != null)
client.runClient();
} }
} }

View File

@ -6,6 +6,8 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
@ -26,6 +28,7 @@ public class StreamSinkServer {
private String _destFile; private String _destFile;
private String _i2cpHost; private String _i2cpHost;
private int _i2cpPort; private int _i2cpPort;
private int _handlers;
/** /**
* Create but do not start the streaming server. * Create but do not start the streaming server.
@ -34,13 +37,14 @@ public class StreamSinkServer {
* @param ourDestFile filename to write our binary destination to * @param ourDestFile filename to write our binary destination to
*/ */
public StreamSinkServer(String sinkDir, String ourDestFile) { public StreamSinkServer(String sinkDir, String ourDestFile) {
this(sinkDir, ourDestFile, null, -1); this(sinkDir, ourDestFile, null, -1, 3);
} }
public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort) { public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
_sinkDir = sinkDir; _sinkDir = sinkDir;
_destFile = ourDestFile; _destFile = ourDestFile;
_i2cpHost = i2cpHost; _i2cpHost = i2cpHost;
_i2cpPort = i2cpPort; _i2cpPort = i2cpPort;
_handlers = handlers;
_log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
} }
@ -56,7 +60,8 @@ public class StreamSinkServer {
else else
mgr = I2PSocketManagerFactory.createManager(); mgr = I2PSocketManagerFactory.createManager();
Destination dest = mgr.getSession().getMyDestination(); Destination dest = mgr.getSession().getMyDestination();
System.out.println("Listening for connections on: " + dest.calculateHash().toBase64()); if (_log.shouldLog(Log.INFO))
_log.info("Listening for connections on: " + dest.calculateHash().toBase64());
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
fos = new FileOutputStream(_destFile); fos = new FileOutputStream(_destFile);
@ -72,10 +77,33 @@ public class StreamSinkServer {
} }
I2PServerSocket sock = mgr.getServerSocket(); I2PServerSocket sock = mgr.getServerSocket();
startup(sock);
}
public void startup(I2PServerSocket sock) {
for (int i = 0; i < _handlers; i++) {
I2PThread t = new I2PThread(new ClientRunner(sock));
t.setName("Handler " + i);
t.setDaemon(false);
t.start();
}
}
/**
* Actually deal with a client - pull anything they send us and write it to a file.
*
*/
private class ClientRunner implements Runnable {
private I2PServerSocket _socket;
public ClientRunner(I2PServerSocket socket) {
_socket = socket;
}
public void run() {
while (true) { while (true) {
try { try {
I2PSocket curSock = sock.accept(); I2PSocket socket = _socket.accept();
handle(curSock); if (socket != null)
handle(socket);
} catch (I2PException ie) { } catch (I2PException ie) {
_log.error("Error accepting connection", ie); _log.error("Error accepting connection", ie);
return; return;
@ -86,38 +114,24 @@ public class StreamSinkServer {
} }
} }
private void handle(I2PSocket socket) { private void handle(I2PSocket sock) {
I2PThread t = new I2PThread(new ClientRunner(socket)); FileOutputStream fos = null;
t.setName("Handle " + socket.getPeerDestination().calculateHash().toBase64().substring(0,4));
t.start();
}
/**
* Actually deal with a client - pull anything they send us and write it to a file.
*
*/
private class ClientRunner implements Runnable {
private I2PSocket _sock;
private FileOutputStream _fos;
public ClientRunner(I2PSocket socket) {
_sock = socket;
try { try {
File sink = new File(_sinkDir); File sink = new File(_sinkDir);
if (!sink.exists()) if (!sink.exists())
sink.mkdirs(); sink.mkdirs();
File cur = File.createTempFile("clientSink", ".dat", sink); File cur = File.createTempFile("clientSink", ".dat", sink);
_fos = new FileOutputStream(cur); fos = new FileOutputStream(cur);
System.out.println("Writing to " + cur.getAbsolutePath()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Writing to " + cur.getAbsolutePath());
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error creating sink", ioe); _log.error("Error creating sink", ioe);
_fos = null; return;
} }
}
public void run() {
if (_fos == null) return;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
try { try {
InputStream in = _sock.getInputStream(); InputStream in = sock.getInputStream();
byte buf[] = new byte[4096]; byte buf[] = new byte[4096];
long written = 0; long written = 0;
int read = 0; int read = 0;
@ -125,47 +139,55 @@ public class StreamSinkServer {
//_fos.write(buf, 0, read); //_fos.write(buf, 0, read);
written += read; written += read;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read and wrote " + read); _log.debug("read and wrote " + read + " (" + written + ")");
} }
_fos.write(("written: [" + written + "]\n").getBytes()); fos.write(("written: [" + written + "]\n").getBytes());
long lifetime = System.currentTimeMillis() - start; long lifetime = System.currentTimeMillis() - start;
_log.error("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error writing the sink", ioe); _log.error("Error writing the sink", ioe);
} finally { } finally {
if (_fos != null) try { _fos.close(); } catch (IOException ioe) {} if (fos != null) try { fos.close(); } catch (IOException ioe) {}
if (_sock != null) try { _sock.close(); } catch (IOException ioe) {} if (sock != null) try { sock.close(); } catch (IOException ioe) {}
_log.error("Client socket closed"); _log.debug("Client socket closed");
} }
} }
} }
/** /**
* Fire up the streaming server. <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile</code><br /> * Fire up the streaming server. <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
* <ul> * <ul>
* <li><b>sinkDir</b>: Directory to store received files in</li> * <li><b>sinkDir</b>: Directory to store received files in</li>
* <li><b>ourDestFile</b>: filename to write our binary destination to</li> * <li><b>ourDestFile</b>: filename to write our binary destination to</li>
* <li><b>numHandlers</b>: how many concurrent connections to handle</li>
* </ul> * </ul>
*/ */
public static void main(String args[]) { public static void main(String args[]) {
StreamSinkServer server = null; StreamSinkServer server = null;
switch (args.length) { switch (args.length) {
case 0: case 0:
server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654); server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
break; break;
case 2: case 2:
server = new StreamSinkServer(args[0], args[1]); server = new StreamSinkServer(args[0], args[1]);
break; break;
case 4: case 4:
case 5:
int handlers = 3;
if (args.length == 5) {
try {
handlers = Integer.parseInt(args[4]);
} catch (NumberFormatException nfe) {}
}
try { try {
int port = Integer.parseInt(args[1]); int port = Integer.parseInt(args[1]);
server = new StreamSinkServer(args[2], args[3], args[0], port); server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile"); System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
} }
break; break;
default: default:
System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile"); System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
} }
if (server != null) if (server != null)
server.runServer(); server.runServer();

View File

@ -68,6 +68,7 @@ public class ConnectionManager {
_context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
} }
Connection getConnectionByInboundId(byte[] id) { Connection getConnectionByInboundId(byte[] id) {
@ -109,7 +110,14 @@ public class ConnectionManager {
byte receiveId[] = new byte[4]; byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId); _context.random().nextBytes(receiveId);
boolean reject = false; boolean reject = false;
int active = 0;
int total = 0;
synchronized (_connectionLock) { synchronized (_connectionLock) {
total = _connectionByInboundId.size();
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
if ( ((Connection)iter.next()).getIsConnected() )
active++;
}
if (locked_tooManyStreams()) { if (locked_tooManyStreams()) {
reject = true; reject = true;
} else { } else {
@ -127,6 +135,8 @@ public class ConnectionManager {
} }
} }
_context.statManager().addRateData("stream.receiveActive", active, total);
if (reject) { if (reject) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Refusing connection since we have exceeded our max of " _log.warn("Refusing connection since we have exceeded our max of "
@ -227,6 +237,8 @@ public class ConnectionManager {
} }
private boolean locked_tooManyStreams() { private boolean locked_tooManyStreams() {
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
int active = 0; int active = 0;
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next(); Connection con = (Connection)iter.next();
@ -238,8 +250,6 @@ public class ConnectionManager {
_log.info("More than 100 connections! " + active _log.info("More than 100 connections! " + active
+ " total: " + _connectionByInboundId.size()); + " total: " + _connectionByInboundId.size());
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
return (active >= _maxConcurrentStreams); return (active >= _maxConcurrentStreams);
} }

View File

@ -98,8 +98,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1));
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));

View File

@ -76,9 +76,9 @@
windowtitle="I2P"> windowtitle="I2P">
<sourcepath> <sourcepath>
<pathelement location="core/java/src" /> <pathelement location="core/java/src" />
<pathelement location="core/java/test" /> <!--<pathelement location="core/java/test" />-->
<pathelement location="router/java/src" /> <pathelement location="router/java/src" />
<pathelement location="router/java/test" /> <!--<pathelement location="router/java/test" />-->
<pathelement location="apps/ministreaming/java/src" /> <pathelement location="apps/ministreaming/java/src" />
<pathelement location="apps/streaming/java/src" /> <pathelement location="apps/streaming/java/src" />
<pathelement location="apps/i2ptunnel/java/src" /> <pathelement location="apps/i2ptunnel/java/src" />
@ -246,7 +246,15 @@
<tarfileset dir="pkg-temp" includes="**/*" prefix="i2p" /> <tarfileset dir="pkg-temp" includes="**/*" prefix="i2p" />
</tar> </tar>
</target> </target>
<target name="updater" depends="distclean, build"> <target name="updater" depends="prepupdate">
<zip destfile="i2pupdate.zip" basedir="pkg-temp" />
</target>
<target name="updateTest" depends="prepupdate">
<ant dir="core/java/" target="jarTest" />
<copy file="core/java/build/i2ptest.jar" todir="pkg-temp/lib" />
<zip destfile="i2pupdate.zip" basedir="pkg-temp" />
</target>
<target name="prepupdate" depends="distclean, build">
<delete dir="pkg-temp" /> <delete dir="pkg-temp" />
<copy file="build/i2p.jar" todir="pkg-temp/lib/" /> <copy file="build/i2p.jar" todir="pkg-temp/lib/" />
<copy file="build/i2ptunnel.jar" todir="pkg-temp/lib/" /> <copy file="build/i2ptunnel.jar" todir="pkg-temp/lib/" />
@ -286,7 +294,6 @@
<mkdir dir="pkg-temp/eepsite" /> <mkdir dir="pkg-temp/eepsite" />
<mkdir dir="pkg-temp/eepsite/webapps" /> <mkdir dir="pkg-temp/eepsite/webapps" />
<mkdir dir="pkg-temp/eepsite/cgi-bin" /> <mkdir dir="pkg-temp/eepsite/cgi-bin" />
<zip destfile="i2pupdate.zip" basedir="pkg-temp" />
</target> </target>
<taskdef name="izpack" classpath="${basedir}/installer/lib/izpack/standalone-compiler.jar" classname="com.izforge.izpack.ant.IzPackTask" /> <taskdef name="izpack" classpath="${basedir}/installer/lib/izpack/standalone-compiler.jar" classname="com.izforge.izpack.ant.IzPackTask" />
<target name="installer" depends="preppkg"> <target name="installer" depends="preppkg">

View File

@ -18,6 +18,9 @@
<target name="jar" depends="compile"> <target name="jar" depends="compile">
<jar destfile="./build/i2p.jar" basedir="./build/obj" includes="**/*.class" /> <jar destfile="./build/i2p.jar" basedir="./build/obj" includes="**/*.class" />
</target> </target>
<target name="jarTest" depends="compileTest">
<jar destfile="./build/i2ptest.jar" basedir="./build/obj" includes="**/*.class" />
</target>
<target name="javadoc"> <target name="javadoc">
<mkdir dir="./build" /> <mkdir dir="./build" />
<mkdir dir="./build/javadoc" /> <mkdir dir="./build/javadoc" />

View File

@ -4,6 +4,7 @@ import java.util.Random;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.PooledRandomSource; import net.i2p.util.PooledRandomSource;
import net.i2p.util.RandomSource; import net.i2p.util.RandomSource;
import net.i2p.util.Log;
/** /**
* *
@ -11,6 +12,9 @@ import net.i2p.util.RandomSource;
public class DummyPooledRandomSource extends PooledRandomSource { public class DummyPooledRandomSource extends PooledRandomSource {
public DummyPooledRandomSource(I2PAppContext context) { public DummyPooledRandomSource(I2PAppContext context) {
super(context); super(context);
}
protected void initializePool(I2PAppContext context) {
_pool = new RandomSource[POOL_SIZE]; _pool = new RandomSource[POOL_SIZE];
for (int i = 0; i < POOL_SIZE; i++) { for (int i = 0; i < POOL_SIZE; i++) {
_pool[i] = new DummyRandomSource(context); _pool[i] = new DummyRandomSource(context);

View File

@ -58,6 +58,8 @@ public class PersistentSessionKeyManager extends TransientSessionKeyManager {
* *
*/ */
public void saveState(OutputStream out) throws IOException, DataFormatException { public void saveState(OutputStream out) throws IOException, DataFormatException {
if (true) return;
Set tagSets = getInboundTagSets(); Set tagSets = getInboundTagSets();
Set sessions = getOutboundSessions(); Set sessions = getOutboundSessions();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))

View File

@ -10,6 +10,7 @@ package net.i2p.crypto;
*/ */
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -262,11 +263,23 @@ class TransientSessionKeyManager extends SessionKeyManager {
if (old != null) { if (old != null) {
TagSet oldTS = (TagSet)old; TagSet oldTS = (TagSet)old;
if (!oldTS.getAssociatedKey().equals(tagSet.getAssociatedKey())) { if (!oldTS.getAssociatedKey().equals(tagSet.getAssociatedKey())) {
if (_log.shouldLog(Log.ERROR)) { if (_log.shouldLog(Log.WARN)) {
_log.error("Multiple tags matching! tag: " + tag.toString() + " matches for new tagSet: " + tagSet + " and old tagSet: " + old); _log.warn("Multiple tags matching! tag: " + tag.toString() + " matches for new tagSet: " + tagSet + " and old tagSet: " + old);
_log.error("Earlier tag set creation: " + old + ": key=" + oldTS.getAssociatedKey().toBase64(), oldTS.getCreatedBy()); _log.warn("Earlier tag set creation: " + old + ": key=" + oldTS.getAssociatedKey().toBase64(), oldTS.getCreatedBy());
_log.error("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey().toBase64(), tagSet.getCreatedBy()); _log.warn("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey().toBase64(), tagSet.getCreatedBy());
} }
// drop both, rather than sit around confused
_inboundTagSets.remove(tag);
for (Iterator tsIter = oldTS.dropTags().iterator(); iter.hasNext(); ) {
SessionTag curTag = (SessionTag)tsIter.next();
_inboundTagSets.remove(curTag);
}
for (Iterator tsIter = tagSet.dropTags().iterator(); iter.hasNext(); ) {
SessionTag curTag = (SessionTag)tsIter.next();
_inboundTagSets.remove(curTag);
}
} else { } else {
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
//tagSet.getTags().addAll(oldTS.getTags()); //tagSet.getTags().addAll(oldTS.getTags());
@ -307,7 +320,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
synchronized (_inboundTagSets) { synchronized (_inboundTagSets) {
for (Iterator iter = _inboundTagSets.values().iterator(); iter.hasNext(); ) { for (Iterator iter = _inboundTagSets.values().iterator(); iter.hasNext(); ) {
TagSet set = (TagSet)iter.next(); TagSet set = (TagSet)iter.next();
int size = set.getTags().size(); int size = set.getTagCount();
if (size > 1000) if (size > 1000)
absurd++; absurd++;
if (size > 100) if (size > 100)
@ -322,7 +335,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
} }
for (int i = 0; i < removed.size(); i++) { for (int i = 0; i < removed.size(); i++) {
TagSet cur = (TagSet)removed.get(i); TagSet cur = (TagSet)removed.get(i);
for (Iterator iter = cur.getTags().iterator(); iter.hasNext(); ) { for (Iterator iter = cur.dropTags().iterator(); iter.hasNext(); ) {
SessionTag tag = (SessionTag)iter.next(); SessionTag tag = (SessionTag)iter.next();
_inboundTagSets.remove(tag); _inboundTagSets.remove(tag);
tags++; tags++;
@ -465,7 +478,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
for (Iterator siter = sets.iterator(); siter.hasNext();) { for (Iterator siter = sets.iterator(); siter.hasNext();) {
TagSet ts = (TagSet) siter.next(); TagSet ts = (TagSet) siter.next();
buf.append("<li><b>Received on:</b> ").append(new Date(ts.getDate())).append(" with ") buf.append("<li><b>Received on:</b> ").append(new Date(ts.getDate())).append(" with ")
.append(ts.getTags().size()).append(" tags remaining</li>"); .append(ts.getTagCount()).append(" tags remaining</li>");
} }
buf.append("</ul></td></tr>"); buf.append("</ul></td></tr>");
} }
@ -485,9 +498,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
buf.append("<tr><td><ul>"); buf.append("<tr><td><ul>");
for (Iterator siter = sess.getTagSets().iterator(); siter.hasNext();) { for (Iterator siter = sess.getTagSets().iterator(); siter.hasNext();) {
TagSet ts = (TagSet) siter.next(); TagSet ts = (TagSet) siter.next();
buf.append("<li><b>Sent on:</b> ").append(new Date(ts.getDate())).append(" with ").append( buf.append("<li><b>Sent on:</b> ").append(new Date(ts.getDate())).append(" with ").append(ts.getTagCount())
ts.getTags()
.size())
.append(" tags remaining</li>"); .append(" tags remaining</li>");
} }
buf.append("</ul></td></tr>"); buf.append("</ul></td></tr>");
@ -540,7 +551,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
_tagSets = new ArrayList(); _tagSets = new ArrayList();
for (int i = 0; i < sets.size(); i++) { for (int i = 0; i < sets.size(); i++) {
TagSet set = (TagSet) sets.get(i); TagSet set = (TagSet) sets.get(i);
dropped += set.getTags().size(); dropped += set.getTagCount();
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Rekeyed from " + _currentKey + " to " + key _log.info("Rekeyed from " + _currentKey + " to " + key
@ -604,7 +615,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
for (int i = 0; i < _tagSets.size(); i++) { for (int i = 0; i < _tagSets.size(); i++) {
TagSet set = (TagSet) _tagSets.get(i); TagSet set = (TagSet) _tagSets.get(i);
if (set.getDate() + SESSION_TAG_DURATION_MS > now) if (set.getDate() + SESSION_TAG_DURATION_MS > now)
tags += set.getTags().size(); tags += set.getTagCount();
} }
} }
return tags; return tags;
@ -620,7 +631,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
synchronized (_tagSets) { synchronized (_tagSets) {
for (Iterator iter = _tagSets.iterator(); iter.hasNext();) { for (Iterator iter = _tagSets.iterator(); iter.hasNext();) {
TagSet set = (TagSet) iter.next(); TagSet set = (TagSet) iter.next();
if ( (set.getDate() > last) && (set.getTags().size() > 0) ) if ( (set.getDate() > last) && (set.getTagCount() > 0) )
last = set.getDate(); last = set.getDate();
} }
} }
@ -665,11 +676,32 @@ class TransientSessionKeyManager extends SessionKeyManager {
_date = when; _date = when;
} }
/** tags still available */ /**
public Set getTags() { * raw tags still available - you MUST synchronize against the TagSet instance
* if you need to use this set
*/
Set getTags() {
return _sessionTags; return _sessionTags;
} }
public int getTagCount() {
synchronized (TagSet.this) {
if (_sessionTags == null)
return 0;
else
return _sessionTags.size();
}
}
public Set dropTags() {
Set rv = null;
synchronized (TagSet.this) {
rv = _sessionTags;
_sessionTags = Collections.EMPTY_SET;
}
return rv;
}
public SessionKey getAssociatedKey() { public SessionKey getAssociatedKey() {
return _key; return _key;
} }

View File

@ -28,12 +28,15 @@ public class BufferedRandomSource extends RandomSource {
private int _nextBit; private int _nextBit;
private static volatile long _reseeds; private static volatile long _reseeds;
private static final int BUFFER_SIZE = 256*1024; private static final int DEFAULT_BUFFER_SIZE = 256*1024;
public BufferedRandomSource(I2PAppContext context) { public BufferedRandomSource(I2PAppContext context) {
this(context, DEFAULT_BUFFER_SIZE);
}
public BufferedRandomSource(I2PAppContext context, int bufferSize) {
super(context); super(context);
context.statManager().createRateStat("prng.reseedCount", "How many times the prng has been reseeded", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } ); context.statManager().createRateStat("prng.reseedCount", "How many times the prng has been reseeded", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } );
_buffer = new byte[BUFFER_SIZE]; _buffer = new byte[bufferSize];
refillBuffer(); refillBuffer();
// stagger reseeding // stagger reseeding
_nextByte = ((int)_reseeds-1) * 16 * 1024; _nextByte = ((int)_reseeds-1) * 16 * 1024;
@ -73,7 +76,7 @@ public class BufferedRandomSource extends RandomSource {
_nextBit = 0; _nextBit = 0;
_nextByte++; _nextByte++;
} }
if (_nextByte >= BUFFER_SIZE) if (_nextByte >= _buffer.length)
refillBuffer(); refillBuffer();
rv += (_buffer[_nextByte] << curBit); rv += (_buffer[_nextByte] << curBit);
_nextBit++; _nextBit++;
@ -98,7 +101,7 @@ public class BufferedRandomSource extends RandomSource {
_nextBit = 0; _nextBit = 0;
_nextByte++; _nextByte++;
} }
if (_nextByte >= BUFFER_SIZE) if (_nextByte >= _buffer.length)
refillBuffer(); refillBuffer();
int gobbleBits = 8 - _nextBit; int gobbleBits = 8 - _nextBit;
int want = numBits - curBit; int want = numBits - curBit;
@ -117,10 +120,10 @@ public class BufferedRandomSource extends RandomSource {
public synchronized final void nextBytes(byte buf[]) { public synchronized final void nextBytes(byte buf[]) {
int outOffset = 0; int outOffset = 0;
while (outOffset < buf.length) { while (outOffset < buf.length) {
int availableBytes = BUFFER_SIZE - _nextByte - (_nextBit != 0 ? 1 : 0); int availableBytes = _buffer.length - _nextByte - (_nextBit != 0 ? 1 : 0);
if (availableBytes <= 0) if (availableBytes <= 0)
refillBuffer(); refillBuffer();
int start = BUFFER_SIZE - availableBytes; int start = _buffer.length - availableBytes;
int writeSize = Math.min(buf.length - outOffset, availableBytes); int writeSize = Math.min(buf.length - outOffset, availableBytes);
System.arraycopy(_buffer, start, buf, outOffset, writeSize); System.arraycopy(_buffer, start, buf, outOffset, writeSize);
outOffset += writeSize; outOffset += writeSize;
@ -195,6 +198,10 @@ public class BufferedRandomSource extends RandomSource {
} }
public static void main(String args[]) { public static void main(String args[]) {
for (int i = 0; i < 16; i++)
test();
}
private static void test() {
I2PAppContext ctx = I2PAppContext.getGlobalContext(); I2PAppContext ctx = I2PAppContext.getGlobalContext();
byte data[] = new byte[16*1024]; byte data[] = new byte[16*1024];
for (int i = 0; i < data.length; i += 4) { for (int i = 0; i < data.length; i += 4) {
@ -203,8 +210,7 @@ public class BufferedRandomSource extends RandomSource {
DataHelper.toLong(data, i, 4, l); DataHelper.toLong(data, i, 4, l);
} }
byte compressed[] = DataHelper.compress(data); byte compressed[] = DataHelper.compress(data);
System.out.println("Compressed: " + compressed.length); System.out.println("Data: " + data.length + "/" + compressed.length + ": " + toString(data));
System.out.println("Orig: " + data.length + ": " + toString(data));
} }
private static final String toString(byte data[]) { private static final String toString(byte data[]) {
StringBuffer buf = new StringBuffer(data.length * 9); StringBuffer buf = new StringBuffer(data.length * 9);

View File

@ -21,14 +21,40 @@ public class PooledRandomSource extends RandomSource {
protected volatile int _nextPool; protected volatile int _nextPool;
public static final int POOL_SIZE = 16; public static final int POOL_SIZE = 16;
/**
* How much random data will we precalculate and feed from (as opposed to on demand
* reseeding, etc). If this is not set, a default will be used (4MB), or if it is
* set to 0, no buffer will be used, otherwise the amount specified will be allocated
* across the pooled PRNGs.
*
*/
public static final String PROP_BUFFER_SIZE = "i2p.prng.totalBufferSizeKB";
public PooledRandomSource(I2PAppContext context) { public PooledRandomSource(I2PAppContext context) {
super(context); super(context);
_log = context.logManager().getLog(PooledRandomSource.class); _log = context.logManager().getLog(PooledRandomSource.class);
initializePool(context);
}
protected void initializePool(I2PAppContext context) {
_pool = new RandomSource[POOL_SIZE]; _pool = new RandomSource[POOL_SIZE];
String totalSizeProp = context.getProperty(PROP_BUFFER_SIZE);
int totalSize = -1;
if (totalSizeProp != null) {
try {
totalSize = Integer.parseInt(totalSizeProp);
} catch (NumberFormatException nfe) {
totalSize = -1;
}
}
for (int i = 0; i < POOL_SIZE; i++) { for (int i = 0; i < POOL_SIZE; i++) {
//_pool[i] = new RandomSource(context); if (totalSize < 0)
_pool[i] = new BufferedRandomSource(context); _pool[i] = new BufferedRandomSource(context);
else if (totalSize > 0)
_pool[i] = new BufferedRandomSource(context, (totalSize*1024) / POOL_SIZE);
else
_pool[i] = new RandomSource(context);
_pool[i].nextBoolean(); _pool[i].nextBoolean();
} }
_nextPool = 0; _nextPool = 0;

View File

@ -1,6 +1,20 @@
$Id: history.txt,v 1.207 2005/07/04 15:44:17 jrandom Exp $ $Id: history.txt,v 1.208 2005/07/05 17:08:56 jrandom Exp $
2005-07-05 2005-07-11 jrandom
* Reduced the growth factor on the slow start and congestion avoidance for
the streaming lib.
* Adjusted some of the I2PTunnelServer threading to use a small pool of
handlers, rather than launching off new threads which then immediately
launch off an I2PTunnelRunner instance (which launches 3 more threads..)
* Don't persist session keys / session tags (not worth it, for now)
* Added some detection and handling code for duplicate session tags being
delivered (root cause still not addressed)
* Make the PRNG's buffer size configurable (via the config property
"i2p.prng.totalBufferSizeKB=4096")
* Disable SSU flooding by default (duh)
* Updates to the StreamSink apps for better throttling tests.
2005-07-05 jrandom
* Use a buffered PRNG, pulling the PRNG data off a larger precalculated * Use a buffered PRNG, pulling the PRNG data off a larger precalculated
buffer, rather than the underlying PRNG's (likely small) one, which in buffer, rather than the underlying PRNG's (likely small) one, which in
turn reduces the frequency of recalcing. turn reduces the frequency of recalcing.

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.198 $ $Date: 2005/07/04 15:44:21 $"; public final static String ID = "$Revision: 1.199 $ $Date: 2005/07/05 17:08:59 $";
public final static String VERSION = "0.5.0.7"; public final static String VERSION = "0.5.0.7";
public final static long BUILD = 10; public final static long BUILD = 11;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -70,6 +70,8 @@ public class SessionKeyPersistenceHelper implements Service {
} }
private void writeState() { private void writeState() {
if (true) return;
Object o = _context.sessionKeyManager(); Object o = _context.sessionKeyManager();
if (!(o instanceof PersistentSessionKeyManager)) { if (!(o instanceof PersistentSessionKeyManager)) {
_log.error("Unable to persist the session key state - manager is " + o.getClass().getName()); _log.error("Unable to persist the session key state - manager is " + o.getClass().getName());

View File

@ -135,7 +135,7 @@ public class ClientConnectionRunner {
_alreadyProcessed.clear(); _alreadyProcessed.clear();
} }
_config = null; _config = null;
_manager = null; //_manager = null;
} }
/** current client's config */ /** current client's config */
@ -271,11 +271,13 @@ public class ClientConnectionRunner {
+ "]"); + "]");
long beforeDistribute = _context.clock().now(); long beforeDistribute = _context.clock().now();
// the following blocks as described above // the following blocks as described above
_manager.distributeMessage(_config.getDestination(), message.getDestination(), message.getPayload(), id); SessionConfig cfg = _config;
if (cfg != null)
_manager.distributeMessage(cfg.getDestination(), dest, payload, id);
long timeToDistribute = _context.clock().now() - beforeDistribute; long timeToDistribute = _context.clock().now() - beforeDistribute;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.warn("Time to distribute in the manager to " _log.warn("Time to distribute in the manager to "
+ message.getDestination().calculateHash().toBase64() + ": " + dest.calculateHash().toBase64() + ": "
+ timeToDistribute); + timeToDistribute);
return id; return id;
} }

View File

@ -103,11 +103,11 @@ public class GarlicMessageReceiver {
clove.getExpiration().getTime()); clove.getExpiration().getTime());
if (invalidReason != null) { if (invalidReason != null) {
String howLongAgo = DataHelper.formatDuration(_context.clock().now()-clove.getExpiration().getTime()); String howLongAgo = DataHelper.formatDuration(_context.clock().now()-clove.getExpiration().getTime());
if (_log.shouldLog(Log.ERROR))
_log.error("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago: " + invalidReason + ": " + clove);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Clove is NOT valid: id=" + clove.getCloveId() _log.warn("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago: " + invalidReason + ": " + clove);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago", new Exception("Invalid within...")); + " expiration " + howLongAgo + " ago", new Exception("Invalid within..."));
_context.messageHistory().messageProcessingError(clove.getCloveId(), _context.messageHistory().messageProcessingError(clove.getCloveId(),
clove.getData().getClass().getName(), clove.getData().getClass().getName(),

View File

@ -140,6 +140,7 @@ public class TransportManager implements TransportEventListener {
for (int i = 0; i < _transports.size(); i++) { for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i); Transport t = (Transport)_transports.get(i);
if (failedTransports.contains(t.getStyle())) { if (failedTransports.contains(t.getStyle())) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Skipping transport " + t.getStyle() + " as it already failed"); _log.debug("Skipping transport " + t.getStyle() + " as it already failed");
continue; continue;
} }

View File

@ -114,9 +114,9 @@ class UDPFlooder implements Runnable {
private long calcFloodDelay() { private long calcFloodDelay() {
try { try {
return Long.parseLong(_context.getProperty("udp.floodDelay", "30000")); return Long.parseLong(_context.getProperty("udp.floodDelay", "300000"));
} catch (Exception e) { } catch (Exception e) {
return 30*1000; return 5*60*1000;
} }
} }
} }

View File

@ -100,7 +100,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */ /** should we flood all UDP peers with the configured rate? */
private static final boolean SHOULD_FLOOD_PEERS = true; private static final boolean SHOULD_FLOOD_PEERS = false;
private static final int MAX_CONSECUTIVE_FAILED = 5; private static final int MAX_CONSECUTIVE_FAILED = 5;