diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index ce9ca8ed8..06a9a5211 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -15,8 +15,10 @@ import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.ConnectException; import java.net.NoRouteToHostException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Set; @@ -29,6 +31,7 @@ import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.client.streaming.I2PSocketOptions; import net.i2p.data.Base64; import net.i2p.data.ByteArray; +import net.i2p.data.DataHelper; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; import net.i2p.util.ByteCache; @@ -53,7 +56,10 @@ public class SAMStreamSession { private I2PSocketManager socketMgr = null; private Object handlersMapLock = new Object(); + /** stream id (Long) to SAMStreamSessionSocketReader */ private HashMap handlersMap = new HashMap(); + /** stream id (Long) to StreamSender */ + private HashMap sendersMap = new HashMap(); private Object idLock = new Object(); private int lastNegativeId = 0; @@ -61,6 +67,14 @@ public class SAMStreamSession { // Can we create outgoing connections? private boolean canCreate = false; + /** + * should we flush every time we get a STREAM SEND, or leave that up to + * the streaming lib to decide? + */ + private boolean forceFlush = false; + public static String PROP_FORCE_FLUSH = "sam.forceFlush"; + public static String DEFAULT_FORCE_FLUSH = "false"; + /** * Create a new SAM STREAM session. * @@ -109,9 +123,6 @@ public class SAMStreamSession { } catch (NumberFormatException nfe) { throw new SAMException("Invalid I2CP port specified [" + port + "]"); } - // streams MUST be mode=guaranteed (though i think the socket manager - // enforces this anyway... - allprops.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); _log.debug("Creating I2PSocketManager..."); socketMgr = I2PSocketManagerFactory.createManager(destStream, @@ -122,6 +133,8 @@ public class SAMStreamSession { throw new SAMException("Error creating I2PSocketManager"); } + forceFlush = Boolean.valueOf(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH)).booleanValue(); + boolean canReceive = false; if (dir.equals("BOTH")) { canCreate = true; @@ -199,14 +212,14 @@ public class SAMStreamSession { * * @param data Bytes to be sent * - * @return True if the data was sent, false otherwise + * @return True if the data was queued for sending, false otherwise */ public boolean sendBytes(int id, InputStream in, int size) throws IOException { - SAMStreamSessionSocketHandler handler = getSocketHandler(id); + StreamSender sender = getSender(id); - if (handler == null) { + if (sender == null) { if (_log.shouldLog(Log.WARN)) - _log.error("Trying to send bytes through inexistent handler " +id); + _log.warn("Trying to send bytes through nonexistent handler " +id); // even though it failed, we need to read those bytes! for (int i = 0; i < size; i++) { int c = in.read(); @@ -216,7 +229,8 @@ public class SAMStreamSession { return false; } - return handler.sendBytes(in, size); + sender.sendBytes(in, size); + return true; } /** @@ -256,13 +270,15 @@ public class SAMStreamSession { * @return An id associated to the socket handler */ private int createSocketHandler(I2PSocket s, int id) { - SAMStreamSessionSocketHandler handler; + SAMStreamSessionSocketReader reader = null; + StreamSender sender = null; if (id == 0) { id = createUniqueId(); } try { - handler = new SAMStreamSessionSocketHandler(s, id); + reader = new SAMStreamSessionSocketReader(s, id); + sender = new StreamSender(s, id); } catch (IOException e) { _log.error("IOException when creating SAM STREAM session socket handler", e); recv.stopStreamReceiving(); @@ -270,10 +286,13 @@ public class SAMStreamSession { } synchronized (handlersMapLock) { - handlersMap.put(new Integer(id), handler); + handlersMap.put(new Integer(id), reader); + sendersMap.put(new Integer(id), sender); } - I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler"); + I2PThread t = new I2PThread(reader, "SAMReader" + id); + t.start(); + t = new I2PThread(sender, "SAMSender" + id); t.start(); return id; @@ -291,9 +310,14 @@ public class SAMStreamSession { * * @param id Handler id */ - private SAMStreamSessionSocketHandler getSocketHandler(int id) { + private SAMStreamSessionSocketReader getSocketReader(int id) { synchronized (handlersMapLock) { - return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id)); + return (SAMStreamSessionSocketReader)handlersMap.get(new Integer(id)); + } + } + private StreamSender getSender(int id) { + synchronized (handlersMapLock) { + return (StreamSender)sendersMap.get(new Integer(id)); } } @@ -314,18 +338,19 @@ public class SAMStreamSession { * @param id Handler id to be removed */ private void removeSocketHandler(int id) { - SAMStreamSessionSocketHandler removed; + SAMStreamSessionSocketReader reader = null; + StreamSender sender = null; synchronized (handlersMapLock) { - removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id)); + reader = (SAMStreamSessionSocketReader)handlersMap.remove(new Integer(id)); + sender = (StreamSender)sendersMap.remove(new Integer(id)); } - if (removed == null) { - // ignore - likely the socket handler was already removed by removeAllSocketHandlers - } else { - removed.stopRunning(); - _log.debug("Removed SAM STREAM session socket handler " + id); - } + if (reader != null) + reader.stopRunning(); + if (sender != null) + sender.stopRunning(); + _log.debug("Removed SAM STREAM session socket handler " + id); } /** @@ -344,9 +369,11 @@ public class SAMStreamSession { while (iter.hasNext()) { id = (Integer)iter.next(); - ((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning(); + ((SAMStreamSessionSocketReader)handlersMap.get(id)).stopRunning(); + ((StreamSender)sendersMap.get(id)).stopRunning(); } handlersMap.clear(); + sendersMap.clear(); } } @@ -440,10 +467,9 @@ public class SAMStreamSession { * * @author human */ - public class SAMStreamSessionSocketHandler implements Runnable { + public class SAMStreamSessionSocketReader implements Runnable { private I2PSocket i2pSocket = null; - private OutputStream i2pSocketOS = null; private Object runningLock = new Object(); private boolean stillRunning = true; @@ -451,72 +477,20 @@ public class SAMStreamSession { private int id; /** - * Create a new SAM STREAM session socket handler + * Create a new SAM STREAM session socket reader * * @param s Socket to be handled * @param id Unique id assigned to the handler */ - public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException { + public SAMStreamSessionSocketReader(I2PSocket s, int id) throws IOException { _log.debug("Instantiating new SAM STREAM session socket handler"); i2pSocket = s; - i2pSocketOS = s.getOutputStream(); this.id = id; } /** - * Send bytes through the SAM STREAM session socket handler - * - * @param data Data to be sent - * - * @return True if data has been sent without errors, false otherwise - */ - public boolean sendBytes(InputStream in, int size) throws IOException { - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Handler " + id + ": sending " + size - + " bytes"); - } - ByteCache cache = ByteCache.getInstance(1024, 4); - ByteArray ba = cache.acquire(); - int remaining = size; - try { - byte buf[] = ba.getData(); - while (remaining > 0) { - int read = in.read(buf, 0, remaining > buf.length ? buf.length : remaining); - if (read == -1) { - throw new IOException("Insufficient data from the SAM client (" + remaining + "/" + size + ")"); - } else if (read > 0) { - remaining -= read; - try { - i2pSocketOS.write(buf, 0, read); - } catch (IOException ioe) { - // ok, the stream failed, but the SAM client didn't - if (_log.shouldLog(Log.WARN)) - _log.warn("Stream failed", ioe); - - removeSocketHandler(id); - - // emtpy the remaining payload so we can continue - for (int i = remaining; i > 0; i--) { - int c = in.read(); - if (c == -1) - throw new IOException("Stream closed, but the SAM client didn't send enough anyway (" - + i + " remaining)"); - } - - return false; - } - } - } - } finally { - cache.release(ba); - } - - return true; - } - - /** - * Stop a SAM STREAM session socket handler + * Stop a SAM STREAM session socket reader * */ public void stopRunning() { @@ -575,4 +549,99 @@ public class SAMStreamSession { _log.debug("Shutting down SAM STREAM session socket handler " +id); } } + + + /** + * Lets us push data through the stream without blocking, (even after exceeding + * the I2PSocket's buffer) + */ + private class StreamSender implements Runnable { + private List _data; + private int _id; + private ByteCache _cache; + private OutputStream _out = null; + private boolean _stillRunning; + + public StreamSender(I2PSocket s, int id) throws IOException { + _data = new ArrayList(1); + _id = id; + _cache = ByteCache.getInstance(4, 32*1024); + _out = s.getOutputStream(); + _stillRunning = true; + } + + /** + * Send bytes through the SAM STREAM session socket sender + * + * @param data Data to be sent + * + * @throws IOException if the client didnt provide enough data + */ + public void sendBytes(InputStream in, int size) throws IOException { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handler " + _id + ": sending " + size + " bytes"); + + ByteArray ba = _cache.acquire(); + int read = DataHelper.read(in, ba.getData(), 0, size); + if (read != size) + throw new IOException("Insufficient data from the SAM client (" + read + "/" + size + ")"); + + ba.setValid(read); + synchronized (_data) { + _data.add(ba); + _data.notifyAll(); + } + } + + /** + * Stop a SAM STREAM session socket sender + * + */ + public void stopRunning() { + _log.debug("stopRunning() invoked on socket sender " + _id); + _stillRunning = false; + synchronized (_data) { + _data.clear(); + _data.notifyAll(); + } + } + + public void run() { + ByteArray data = null; + while (_stillRunning) { + data = null; + try { + synchronized (_data) { + if (_data.size() > 0) + data = (ByteArray)_data.remove(0); + else + _data.wait(5000); + } + + if (data != null) { + try { + _out.write(data.getData(), 0, data.getValid()); + if (forceFlush) { + // i dont like doing this, but it clears the buffer issues + _out.flush(); + } + } catch (IOException ioe) { + // ok, the stream failed, but the SAM client didn't + if (_log.shouldLog(Log.WARN)) + _log.warn("Stream failed", ioe); + + removeSocketHandler(_id); + stopRunning(); + + } finally { + _cache.release(data); + } + } + } catch (InterruptedException ie) {} + } + synchronized (_data) { + _data.clear(); + } + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 850b530ae..b94b74779 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -394,15 +394,12 @@ public class Connection { if (_socket != null) _socket.destroy(); _socket = null; - _inputStream = null; if (_outputStream != null) _outputStream.destroy(); - _outputStream = null; - _outboundQueue = null; if (_receiver != null) _receiver.destroy(); if (_activityTimer != null) - SimpleTimer.getInstance().addEvent(_activityTimer, 1); + SimpleTimer.getInstance().removeEvent(_activityTimer); _activityTimer = null; if (!_disconnectScheduled) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 825ce93a1..2e8a8778d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -155,12 +155,13 @@ public class MessageOutputStream extends OutputStream { } public void timeReached() { _enqueued = false; + DataReceiver rec = _dataReceiver; long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) _log.debug("flusher time reached: left = " + timeLeft); if (timeLeft > 0) enqueue(); - else if (_dataReceiver.writeInProcess()) + else if ( (rec != null) && (rec.writeInProcess()) ) enqueue(); // don't passive flush if there is a write being done (unacked outbound) else doFlush(); diff --git a/core/java/src/net/i2p/data/ByteArray.java b/core/java/src/net/i2p/data/ByteArray.java index a99c7130f..01491dd53 100644 --- a/core/java/src/net/i2p/data/ByteArray.java +++ b/core/java/src/net/i2p/data/ByteArray.java @@ -18,6 +18,7 @@ import java.io.Serializable; */ public class ByteArray implements Serializable, Comparable { private byte[] _data; + private int _valid; public ByteArray() { this(null); @@ -25,6 +26,7 @@ public class ByteArray implements Serializable, Comparable { public ByteArray(byte[] data) { _data = data; + _valid = 0; } public final byte[] getData() { @@ -34,6 +36,14 @@ public class ByteArray implements Serializable, Comparable { public void setData(byte[] data) { _data = data; } + + /** + * how many of the bytes in the array are 'valid'? + * this property does not necessarily have meaning for all byte + * arrays. + */ + public final int getValid() { return _valid; } + public final void setValid(int valid) { _valid = valid; } public final boolean equals(Object o) { if (o == null) return false; diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java index da3be47f2..77527f510 100644 --- a/core/java/src/net/i2p/data/DataHelper.java +++ b/core/java/src/net/i2p/data/DataHelper.java @@ -658,12 +658,14 @@ public class DataHelper { } public static int read(InputStream in, byte target[]) throws IOException { - int cur = 0; - while (cur < target.length) { - int numRead = in.read(target, cur, target.length - cur); + return read(in, target, 0, target.length); + } + public static int read(InputStream in, byte target[], int offset, int length) throws IOException { + int cur = offset; + while (cur < length) { + int numRead = in.read(target, cur, length - cur); if (numRead == -1) { - if (cur == 0) return -1; // throw new EOFException("EOF Encountered during reading"); - + if (cur == offset) return -1; // throw new EOFException("EOF Encountered during reading"); return cur; } cur += numRead; @@ -671,6 +673,7 @@ public class DataHelper { return cur; } + /** * Read a newline delimited line from the stream, returning the line (without * the newline), or null if EOF reached before the newline was found diff --git a/history.txt b/history.txt index 25bf981e4..89865a685 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.99 2004/12/06 00:03:58 jrandom Exp $ +$Id: history.txt,v 1.100 2004/12/06 20:09:20 jrandom Exp $ + +2004-12-08 jrandom + * Revised the buffering when reading from the SAM client and writing + to the stream. Also added a thread (sigh) so we don't block the + SAM client from giving us more messages for abnormally long periods + of time. + * Display the router version in the logs on startup (oft requested) + * Fix a race during the closing of a messageOutputStream 2004-12-06 jrandom * Don't do a 'passive flush' while there are already outbound messages diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index aeef673d0..40aeb0fea 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -792,6 +792,8 @@ public class Router { } public static void main(String args[]) { + System.out.println("Starting I2P " + RouterVersion.VERSION + "-" + RouterVersion.BUILD); + System.out.println(RouterVersion.ID); installUpdates(); verifyWrapperConfig(); Router r = new Router(); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 1c719e509..1266fde31 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.104 $ $Date: 2004/12/06 00:03:57 $"; + public final static String ID = "$Revision: 1.105 $ $Date: 2004/12/06 20:09:20 $"; public final static String VERSION = "0.4.2.2"; - public final static long BUILD = 8; + public final static long BUILD = 9; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);