Added Simple true/false storage class to the utilities

Added socketSoTimeout
CHANGED RetransmissionTimer is now public
FIXED SimpleTimer has a way to be stopped, and reap it's children
CLEANUP A few javadoc additions, where I could figgure out bits
CLEANUP all code that needed to catch the timeout exception for socketSoTimeout
This commit is contained in:
sponge
2008-09-27 22:59:22 +00:00
parent b0313bd6bf
commit 61749aaaa9
10 changed files with 154 additions and 17 deletions

View File

@ -12,6 +12,7 @@ import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
@ -219,6 +220,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting", ce); _log.error("Error accepting", ce);
// not killing the server.. // not killing the server..
} catch(SocketTimeoutException ste) {
// ignored, we never set the timeout
} }
} }
} }

View File

@ -2,6 +2,7 @@ package net.i2p.client.streaming;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import net.i2p.I2PException; import net.i2p.I2PException;
/** /**
@ -24,8 +25,21 @@ public interface I2PServerSocket {
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws SocketTimeoutException
*/ */
public I2PSocket accept() throws I2PException, ConnectException; public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/**
* Set Sock Option accept timeout
* @param x
*/
public void setSoTimeout(long x);
/**
* Get Sock Option accept timeout
* @return timeout
*/
public long getSoTimeout();
/** /**
* Access the manager which is coordinating the server socket * Access the manager which is coordinating the server socket

View File

@ -30,6 +30,21 @@ class I2PServerSocketImpl implements I2PServerSocket {
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */ /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
private Object socketAddedLock = new Object(); private Object socketAddedLock = new Object();
/**
* Set Sock Option accept timeout stub, does nothing
* @param x
*/
public void setSoTimeout(long x) {
}
/**
* Get Sock Option accept timeout stub, does nothing
* @return timeout
*/
public long getSoTimeout() {
return -1;
}
public I2PServerSocketImpl(I2PSocketManager mgr) { public I2PServerSocketImpl(I2PSocketManager mgr) {
this.mgr = mgr; this.mgr = mgr;
} }

View File

@ -5,6 +5,7 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Properties; import java.util.Properties;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
@ -107,6 +108,8 @@ public class StreamSinkServer {
} catch (ConnectException ce) { } catch (ConnectException ce) {
_log.error("Connection already dropped", ce); _log.error("Connection already dropped", ce);
return; return;
} catch(SocketTimeoutException ste) {
// ignored
} }
} }
} }

View File

@ -39,6 +39,7 @@ public class ConnectionManager {
private ConnectionOptions _defaultOptions; private ConnectionOptions _defaultOptions;
private volatile int _numWaiting; private volatile int _numWaiting;
private Object _connectionLock; private Object _connectionLock;
private long SoTimeout;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context; _context = context;
@ -58,6 +59,9 @@ public class ConnectionManager {
_maxConcurrentStreams = maxConcurrent; _maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions; _defaultOptions = defaultOptions;
_numWaiting = 0; _numWaiting = 0;
/** Socket timeout for accept() */
SoTimeout = -1;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
@ -90,6 +94,22 @@ public class ConnectionManager {
return null; return null;
} }
/**
* Set the socket accept() timeout.
* @param x
*/
public void MsetSoTimeout(long x) {
SoTimeout = x;
}
/**
* Get the socket accept() timeout.
* @return
*/
public long MgetSoTimeout() {
return SoTimeout;
}
public void setAllowIncomingConnections(boolean allow) { public void setAllowIncomingConnections(boolean allow) {
_connectionHandler.setActive(allow); _connectionHandler.setActive(allow);
} }

View File

@ -1,5 +1,8 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.I2PException; import net.i2p.I2PException;
/** /**
@ -13,11 +16,35 @@ public class I2PServerSocketFull implements I2PServerSocket {
_socketManager = mgr; _socketManager = mgr;
} }
public I2PSocket accept() throws I2PException { /**
*
* @return
* @throws net.i2p.I2PException
* @throws SocketTimeoutException
*/
public I2PSocket accept() throws I2PException, SocketTimeoutException {
return _socketManager.receiveSocket(); return _socketManager.receiveSocket();
} }
public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } public long getSoTimeout() {
return _socketManager.getConnectionManager().MgetSoTimeout();
}
public I2PSocketManager getManager() { return _socketManager; } public void setSoTimeout(long x) {
_socketManager.getConnectionManager().MsetSoTimeout(x);
}
/**
* Close the connection.
*/
public void close() {
_socketManager.getConnectionManager().setAllowIncomingConnections(false);
}
/**
*
* @return _socketManager
*/
public I2PSocketManager getManager() {
return _socketManager;
}
} }

View File

@ -1,6 +1,7 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
@ -44,6 +45,14 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_context = null; _context = null;
_session = null; _session = null;
} }
/**
*
* @param context
* @param session
* @param opts
* @param name
*/
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
this(); this();
init(context, session, opts, name); init(context, session, opts, name);
@ -54,6 +63,11 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* *
*
* @param context
* @param session
* @param opts
* @param name
*/ */
public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context; _context = context;
@ -96,24 +110,38 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return _connectionManager; return _connectionManager;
} }
public I2PSocket receiveSocket() throws I2PException { /**
*
* @return
* @throws net.i2p.I2PException
* @throws java.net.SocketTimeoutException
*/
public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
verifySession(); verifySession();
Connection con = _connectionManager.getConnectionHandler().accept(-1); Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("receiveSocket() called: " + con); _log.debug("receiveSocket() called: " + con);
}
if (con != null) { if (con != null) {
I2PSocketFull sock = new I2PSocketFull(con); I2PSocketFull sock = new I2PSocketFull(con);
con.setSocket(sock); con.setSocket(sock);
return sock; return sock;
} else { } else {
if(_connectionManager.MgetSoTimeout() == -1) {
return null; return null;
} }
throw new SocketTimeoutException("I2PSocket timed out");
}
} }
/** /**
* Ping the specified peer, returning true if they replied to the ping within * Ping the specified peer, returning true if they replied to the ping within
* the timeout specified, false otherwise. This call blocks. * the timeout specified, false otherwise. This call blocks.
* *
*
* @param peer
* @param timeoutMs
* @return
*/ */
public boolean ping(Destination peer, long timeoutMs) { public boolean ping(Destination peer, long timeoutMs) {
return _connectionManager.ping(peer, timeoutMs); return _connectionManager.ping(peer, timeoutMs);

View File

@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer;
/** /**
* *
*/ */
class RetransmissionTimer extends SimpleTimer { public class RetransmissionTimer extends SimpleTimer {
private static final RetransmissionTimer _instance = new RetransmissionTimer(); private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; } public static final SimpleTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); } protected RetransmissionTimer() { super("StreamingTimer"); }

View File

@ -8,12 +8,16 @@ class Executor implements Runnable {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private List _readyEvents; private List _readyEvents;
public Executor(I2PAppContext ctx, Log log, List events) { private SimpleStore runn;
public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
_context = ctx; _context = ctx;
_readyEvents = events; _readyEvents = events;
runn = x;
} }
public void run() { public void run() {
while (true) { while(runn.getAnswer()) {
SimpleTimer.TimedEvent evt = null; SimpleTimer.TimedEvent evt = null;
synchronized (_readyEvents) { synchronized (_readyEvents) {
if (_readyEvents.size() <= 0) if (_readyEvents.size() <= 0)

View File

@ -25,9 +25,11 @@ public class SimpleTimer {
/** event (TimedEvent) to event time (Long) mapping */ /** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes; private Map _eventTimes;
private List _readyEvents; private List _readyEvents;
private SimpleStore runn;
protected SimpleTimer() { this("SimpleTimer"); } protected SimpleTimer() { this("SimpleTimer"); }
protected SimpleTimer(String name) { protected SimpleTimer(String name) {
runn = new SimpleStore(true);
_context = I2PAppContext.getGlobalContext(); _context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(SimpleTimer.class); _log = _context.logManager().getLog(SimpleTimer.class);
_events = new TreeMap(); _events = new TreeMap();
@ -38,13 +40,28 @@ public class SimpleTimer {
runner.setDaemon(true); runner.setDaemon(true);
runner.start(); runner.start();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
executor.setName(name + "Executor " + i); executor.setName(name + "Executor " + i);
executor.setDaemon(true); executor.setDaemon(true);
executor.start(); executor.start();
} }
} }
/**
* Removes the SimpleTimer.
*/
public void removeSimpleTimer() {
synchronized(_events) {
runn.setAnswer(false);
_events.notifyAll();
}
}
/**
*
* @param event
* @param timeoutMs
*/
public void reschedule(TimedEvent event, long timeoutMs) { public void reschedule(TimedEvent event, long timeoutMs) {
addEvent(event, timeoutMs, false); addEvent(event, timeoutMs, false);
} }
@ -55,9 +72,13 @@ public class SimpleTimer {
* for the earlier of the two timeouts, which may be before this stated * for the earlier of the two timeouts, which may be before this stated
* timeout. If this is not the desired behavior, call removeEvent first. * timeout. If this is not the desired behavior, call removeEvent first.
* *
* @param event
* @param timeoutMs
*/ */
public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
/** /**
* @param event
* @param timeoutMs
* @param useEarliestTime if its already scheduled, use the earlier of the * @param useEarliestTime if its already scheduled, use the earlier of the
* two timeouts, else use the later * two timeouts, else use the later
*/ */
@ -143,12 +164,12 @@ public class SimpleTimer {
private long _occurredTime; private long _occurredTime;
private long _occurredEventCount; private long _occurredEventCount;
private TimedEvent _recentEvents[] = new TimedEvent[5]; // not used
// private TimedEvent _recentEvents[] = new TimedEvent[5];
private class SimpleTimerRunner implements Runnable { private class SimpleTimerRunner implements Runnable {
public void run() { public void run() {
List eventsToFire = new ArrayList(1); List eventsToFire = new ArrayList(1);
while (true) { while(runn.getAnswer()) {
try { try {
synchronized (_events) { synchronized (_events) {
//if (_events.size() <= 0) //if (_events.size() <= 0)
@ -158,8 +179,10 @@ public class SimpleTimer {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long nextEventDelay = -1; long nextEventDelay = -1;
Object nextEvent = null; Object nextEvent = null;
while (true) { while(runn.getAnswer()) {
if (_events.size() <= 0) break; if(_events.size() <= 0) {
break;
}
Long when = (Long)_events.firstKey(); Long when = (Long)_events.firstKey();
if (when.longValue() <= now) { if (when.longValue() <= now) {
TimedEvent evt = (TimedEvent)_events.remove(when); TimedEvent evt = (TimedEvent)_events.remove(when);