2004-12-08 jrandom

* Revised the buffering when reading from the SAM client and writing
      to the stream.  Also added a thread (sigh) so we don't block the
      SAM client from giving us more messages for abnormally long periods
      of time.
    * Display the router version in the logs on startup (oft requested)
    * Fix a race during the closing of a messageOutputStream
This commit is contained in:
jrandom
2004-12-08 17:16:16 +00:00
committed by zzz
parent 4c5f7b9451
commit d88396c1e2
8 changed files with 183 additions and 93 deletions

View File

@ -15,8 +15,10 @@ import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@ -29,6 +31,7 @@ import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.ByteCache;
@ -53,7 +56,10 @@ public class SAMStreamSession {
private I2PSocketManager socketMgr = null;
private Object handlersMapLock = new Object();
/** stream id (Long) to SAMStreamSessionSocketReader */
private HashMap handlersMap = new HashMap();
/** stream id (Long) to StreamSender */
private HashMap sendersMap = new HashMap();
private Object idLock = new Object();
private int lastNegativeId = 0;
@ -61,6 +67,14 @@ public class SAMStreamSession {
// Can we create outgoing connections?
private boolean canCreate = false;
/**
* should we flush every time we get a STREAM SEND, or leave that up to
* the streaming lib to decide?
*/
private boolean forceFlush = false;
public static String PROP_FORCE_FLUSH = "sam.forceFlush";
public static String DEFAULT_FORCE_FLUSH = "false";
/**
* Create a new SAM STREAM session.
*
@ -109,9 +123,6 @@ public class SAMStreamSession {
} catch (NumberFormatException nfe) {
throw new SAMException("Invalid I2CP port specified [" + port + "]");
}
// streams MUST be mode=guaranteed (though i think the socket manager
// enforces this anyway...
allprops.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
_log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(destStream,
@ -122,6 +133,8 @@ public class SAMStreamSession {
throw new SAMException("Error creating I2PSocketManager");
}
forceFlush = Boolean.valueOf(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH)).booleanValue();
boolean canReceive = false;
if (dir.equals("BOTH")) {
canCreate = true;
@ -199,14 +212,14 @@ public class SAMStreamSession {
*
* @param data Bytes to be sent
*
* @return True if the data was sent, false otherwise
* @return True if the data was queued for sending, false otherwise
*/
public boolean sendBytes(int id, InputStream in, int size) throws IOException {
SAMStreamSessionSocketHandler handler = getSocketHandler(id);
StreamSender sender = getSender(id);
if (handler == null) {
if (sender == null) {
if (_log.shouldLog(Log.WARN))
_log.error("Trying to send bytes through inexistent handler " +id);
_log.warn("Trying to send bytes through nonexistent handler " +id);
// even though it failed, we need to read those bytes!
for (int i = 0; i < size; i++) {
int c = in.read();
@ -216,7 +229,8 @@ public class SAMStreamSession {
return false;
}
return handler.sendBytes(in, size);
sender.sendBytes(in, size);
return true;
}
/**
@ -256,13 +270,15 @@ public class SAMStreamSession {
* @return An id associated to the socket handler
*/
private int createSocketHandler(I2PSocket s, int id) {
SAMStreamSessionSocketHandler handler;
SAMStreamSessionSocketReader reader = null;
StreamSender sender = null;
if (id == 0) {
id = createUniqueId();
}
try {
handler = new SAMStreamSessionSocketHandler(s, id);
reader = new SAMStreamSessionSocketReader(s, id);
sender = new StreamSender(s, id);
} catch (IOException e) {
_log.error("IOException when creating SAM STREAM session socket handler", e);
recv.stopStreamReceiving();
@ -270,10 +286,13 @@ public class SAMStreamSession {
}
synchronized (handlersMapLock) {
handlersMap.put(new Integer(id), handler);
handlersMap.put(new Integer(id), reader);
sendersMap.put(new Integer(id), sender);
}
I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler");
I2PThread t = new I2PThread(reader, "SAMReader" + id);
t.start();
t = new I2PThread(sender, "SAMSender" + id);
t.start();
return id;
@ -291,9 +310,14 @@ public class SAMStreamSession {
*
* @param id Handler id
*/
private SAMStreamSessionSocketHandler getSocketHandler(int id) {
private SAMStreamSessionSocketReader getSocketReader(int id) {
synchronized (handlersMapLock) {
return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id));
return (SAMStreamSessionSocketReader)handlersMap.get(new Integer(id));
}
}
private StreamSender getSender(int id) {
synchronized (handlersMapLock) {
return (StreamSender)sendersMap.get(new Integer(id));
}
}
@ -314,18 +338,19 @@ public class SAMStreamSession {
* @param id Handler id to be removed
*/
private void removeSocketHandler(int id) {
SAMStreamSessionSocketHandler removed;
SAMStreamSessionSocketReader reader = null;
StreamSender sender = null;
synchronized (handlersMapLock) {
removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id));
reader = (SAMStreamSessionSocketReader)handlersMap.remove(new Integer(id));
sender = (StreamSender)sendersMap.remove(new Integer(id));
}
if (removed == null) {
// ignore - likely the socket handler was already removed by removeAllSocketHandlers
} else {
removed.stopRunning();
_log.debug("Removed SAM STREAM session socket handler " + id);
}
if (reader != null)
reader.stopRunning();
if (sender != null)
sender.stopRunning();
_log.debug("Removed SAM STREAM session socket handler " + id);
}
/**
@ -344,9 +369,11 @@ public class SAMStreamSession {
while (iter.hasNext()) {
id = (Integer)iter.next();
((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning();
((SAMStreamSessionSocketReader)handlersMap.get(id)).stopRunning();
((StreamSender)sendersMap.get(id)).stopRunning();
}
handlersMap.clear();
sendersMap.clear();
}
}
@ -440,10 +467,9 @@ public class SAMStreamSession {
*
* @author human
*/
public class SAMStreamSessionSocketHandler implements Runnable {
public class SAMStreamSessionSocketReader implements Runnable {
private I2PSocket i2pSocket = null;
private OutputStream i2pSocketOS = null;
private Object runningLock = new Object();
private boolean stillRunning = true;
@ -451,72 +477,20 @@ public class SAMStreamSession {
private int id;
/**
* Create a new SAM STREAM session socket handler
* Create a new SAM STREAM session socket reader
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException {
public SAMStreamSessionSocketReader(I2PSocket s, int id) throws IOException {
_log.debug("Instantiating new SAM STREAM session socket handler");
i2pSocket = s;
i2pSocketOS = s.getOutputStream();
this.id = id;
}
/**
* Send bytes through the SAM STREAM session socket handler
*
* @param data Data to be sent
*
* @return True if data has been sent without errors, false otherwise
*/
public boolean sendBytes(InputStream in, int size) throws IOException {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Handler " + id + ": sending " + size
+ " bytes");
}
ByteCache cache = ByteCache.getInstance(1024, 4);
ByteArray ba = cache.acquire();
int remaining = size;
try {
byte buf[] = ba.getData();
while (remaining > 0) {
int read = in.read(buf, 0, remaining > buf.length ? buf.length : remaining);
if (read == -1) {
throw new IOException("Insufficient data from the SAM client (" + remaining + "/" + size + ")");
} else if (read > 0) {
remaining -= read;
try {
i2pSocketOS.write(buf, 0, read);
} catch (IOException ioe) {
// ok, the stream failed, but the SAM client didn't
if (_log.shouldLog(Log.WARN))
_log.warn("Stream failed", ioe);
removeSocketHandler(id);
// emtpy the remaining payload so we can continue
for (int i = remaining; i > 0; i--) {
int c = in.read();
if (c == -1)
throw new IOException("Stream closed, but the SAM client didn't send enough anyway ("
+ i + " remaining)");
}
return false;
}
}
}
} finally {
cache.release(ba);
}
return true;
}
/**
* Stop a SAM STREAM session socket handler
* Stop a SAM STREAM session socket reader
*
*/
public void stopRunning() {
@ -575,4 +549,99 @@ public class SAMStreamSession {
_log.debug("Shutting down SAM STREAM session socket handler " +id);
}
}
/**
* Lets us push data through the stream without blocking, (even after exceeding
* the I2PSocket's buffer)
*/
private class StreamSender implements Runnable {
private List _data;
private int _id;
private ByteCache _cache;
private OutputStream _out = null;
private boolean _stillRunning;
public StreamSender(I2PSocket s, int id) throws IOException {
_data = new ArrayList(1);
_id = id;
_cache = ByteCache.getInstance(4, 32*1024);
_out = s.getOutputStream();
_stillRunning = true;
}
/**
* Send bytes through the SAM STREAM session socket sender
*
* @param data Data to be sent
*
* @throws IOException if the client didnt provide enough data
*/
public void sendBytes(InputStream in, int size) throws IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handler " + _id + ": sending " + size + " bytes");
ByteArray ba = _cache.acquire();
int read = DataHelper.read(in, ba.getData(), 0, size);
if (read != size)
throw new IOException("Insufficient data from the SAM client (" + read + "/" + size + ")");
ba.setValid(read);
synchronized (_data) {
_data.add(ba);
_data.notifyAll();
}
}
/**
* Stop a SAM STREAM session socket sender
*
*/
public void stopRunning() {
_log.debug("stopRunning() invoked on socket sender " + _id);
_stillRunning = false;
synchronized (_data) {
_data.clear();
_data.notifyAll();
}
}
public void run() {
ByteArray data = null;
while (_stillRunning) {
data = null;
try {
synchronized (_data) {
if (_data.size() > 0)
data = (ByteArray)_data.remove(0);
else
_data.wait(5000);
}
if (data != null) {
try {
_out.write(data.getData(), 0, data.getValid());
if (forceFlush) {
// i dont like doing this, but it clears the buffer issues
_out.flush();
}
} catch (IOException ioe) {
// ok, the stream failed, but the SAM client didn't
if (_log.shouldLog(Log.WARN))
_log.warn("Stream failed", ioe);
removeSocketHandler(_id);
stopRunning();
} finally {
_cache.release(data);
}
}
} catch (InterruptedException ie) {}
}
synchronized (_data) {
_data.clear();
}
}
}
}

View File

@ -394,15 +394,12 @@ public class Connection {
if (_socket != null)
_socket.destroy();
_socket = null;
_inputStream = null;
if (_outputStream != null)
_outputStream.destroy();
_outputStream = null;
_outboundQueue = null;
if (_receiver != null)
_receiver.destroy();
if (_activityTimer != null)
SimpleTimer.getInstance().addEvent(_activityTimer, 1);
SimpleTimer.getInstance().removeEvent(_activityTimer);
_activityTimer = null;
if (!_disconnectScheduled) {

View File

@ -155,12 +155,13 @@ public class MessageOutputStream extends OutputStream {
}
public void timeReached() {
_enqueued = false;
DataReceiver rec = _dataReceiver;
long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now());
if (_log.shouldLog(Log.DEBUG))
_log.debug("flusher time reached: left = " + timeLeft);
if (timeLeft > 0)
enqueue();
else if (_dataReceiver.writeInProcess())
else if ( (rec != null) && (rec.writeInProcess()) )
enqueue(); // don't passive flush if there is a write being done (unacked outbound)
else
doFlush();

View File

@ -18,6 +18,7 @@ import java.io.Serializable;
*/
public class ByteArray implements Serializable, Comparable {
private byte[] _data;
private int _valid;
public ByteArray() {
this(null);
@ -25,6 +26,7 @@ public class ByteArray implements Serializable, Comparable {
public ByteArray(byte[] data) {
_data = data;
_valid = 0;
}
public final byte[] getData() {
@ -34,6 +36,14 @@ public class ByteArray implements Serializable, Comparable {
public void setData(byte[] data) {
_data = data;
}
/**
* how many of the bytes in the array are 'valid'?
* this property does not necessarily have meaning for all byte
* arrays.
*/
public final int getValid() { return _valid; }
public final void setValid(int valid) { _valid = valid; }
public final boolean equals(Object o) {
if (o == null) return false;

View File

@ -658,12 +658,14 @@ public class DataHelper {
}
public static int read(InputStream in, byte target[]) throws IOException {
int cur = 0;
while (cur < target.length) {
int numRead = in.read(target, cur, target.length - cur);
return read(in, target, 0, target.length);
}
public static int read(InputStream in, byte target[], int offset, int length) throws IOException {
int cur = offset;
while (cur < length) {
int numRead = in.read(target, cur, length - cur);
if (numRead == -1) {
if (cur == 0) return -1; // throw new EOFException("EOF Encountered during reading");
if (cur == offset) return -1; // throw new EOFException("EOF Encountered during reading");
return cur;
}
cur += numRead;
@ -671,6 +673,7 @@ public class DataHelper {
return cur;
}
/**
* Read a newline delimited line from the stream, returning the line (without
* the newline), or null if EOF reached before the newline was found

View File

@ -1,4 +1,12 @@
$Id: history.txt,v 1.99 2004/12/06 00:03:58 jrandom Exp $
$Id: history.txt,v 1.100 2004/12/06 20:09:20 jrandom Exp $
2004-12-08 jrandom
* Revised the buffering when reading from the SAM client and writing
to the stream. Also added a thread (sigh) so we don't block the
SAM client from giving us more messages for abnormally long periods
of time.
* Display the router version in the logs on startup (oft requested)
* Fix a race during the closing of a messageOutputStream
2004-12-06 jrandom
* Don't do a 'passive flush' while there are already outbound messages

View File

@ -792,6 +792,8 @@ public class Router {
}
public static void main(String args[]) {
System.out.println("Starting I2P " + RouterVersion.VERSION + "-" + RouterVersion.BUILD);
System.out.println(RouterVersion.ID);
installUpdates();
verifyWrapperConfig();
Router r = new Router();

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.104 $ $Date: 2004/12/06 00:03:57 $";
public final static String ID = "$Revision: 1.105 $ $Date: 2004/12/06 20:09:20 $";
public final static String VERSION = "0.4.2.2";
public final static long BUILD = 8;
public final static long BUILD = 9;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);