- Move client-side writes to their own thread
      - Reenable InternalSockets
This commit is contained in:
zzz
2009-12-17 01:05:29 +00:00
parent fa92beae5a
commit b530316850
5 changed files with 123 additions and 32 deletions

View File

@ -0,0 +1,97 @@
package net.i2p.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageImpl;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.util.I2PAppThread;
/**
* Copied from net.i2p.router.client
* We need a single thread that writes so we don't have issues with
* the Piped Streams used in InternalSocket.
*
* @author zzz from net.i2p.router.client.ClientWriterRunner
*/
class ClientWriterRunner implements Runnable {
private OutputStream _out;
private I2PSessionImpl _session;
private BlockingQueue<I2CPMessage> _messagesToWrite;
private static volatile long __Id = 0;
/** starts the thread too */
public ClientWriterRunner(OutputStream out, I2PSessionImpl session) {
_out = out;
_session = session;
_messagesToWrite = new LinkedBlockingQueue();
Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true);
t.start();
}
/**
* Add this message to the writer's queue
*
*/
public void addMessage(I2CPMessage msg) {
try {
_messagesToWrite.put(msg);
} catch (InterruptedException ie) {}
}
/**
* No more messages - dont even try to send what we have
*
*/
public void stopWriting() {
_messagesToWrite.clear();
try {
_messagesToWrite.put(new PoisonMessage());
} catch (InterruptedException ie) {}
}
public void run() {
I2CPMessage msg;
while (!_session.isClosed()) {
try {
msg = _messagesToWrite.take();
} catch (InterruptedException ie) {
continue;
}
if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
break;
// only thread, we don't need synchronized
try {
msg.writeMessage(_out);
_out.flush();
} catch (I2CPMessageException ime) {
_session.propogateError("Error writing out the message", ime);
_session.disconnect();
break;
} catch (IOException ioe) {
_session.propogateError("Error writing out the message", ioe);
_session.disconnect();
break;
}
}
_messagesToWrite.clear();
}
/**
* End-of-stream msg used to stop the concurrent queue
* See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
*
*/
private static class PoisonMessage extends I2CPMessageImpl {
public static final int MESSAGE_TYPE = 999999;
public int getType() {
return MESSAGE_TYPE;
}
public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
}
}

View File

@ -73,6 +73,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected Socket _socket;
/** reader that always searches for messages */
protected I2CPMessageReader _reader;
/** writer message queue */
protected ClientWriterRunner _writer;
/** where we pipe our messages */
protected /* FIXME final FIXME */OutputStream _out;
@ -277,11 +279,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
}
_writer = new ClientWriterRunner(_out, this);
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading");
_reader.startReading();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate");
sendMessage(new GetDateMessage());
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After getDate / begin waiting for a response");
@ -543,34 +545,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @throws I2PSessionException if the message is malformed or there is an error writing it out
*/
void sendMessage(I2CPMessage message) throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed");
long beforeSync = _context.clock().now();
long inSync = 0;
if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync to write");
try {
synchronized (_out) {
inSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) _log.debug("before writeMessage");
message.writeMessage(_out);
if (_log.shouldLog(Log.DEBUG)) _log.debug("after writeMessage");
_out.flush();
if (_log.shouldLog(Log.DEBUG)) _log.debug("after flush");
}
} catch (I2CPMessageException ime) {
throw new I2PSessionException(getPrefix() + "Error writing out the message", ime);
} catch (IOException ioe) {
throw new I2PSessionException(getPrefix() + "Error writing out the message", ioe);
}
long afterSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message written out and flushed w/ "
+ (inSync-beforeSync) + "ms to sync and "
+ (afterSync-inSync) + "ms to send");
if (isClosed() || _writer == null)
throw new I2PSessionException("Already closed");
_writer.addMessage(message);
}
/**
* Pass off the error to the listener
* Misspelled, oh well.
*/
void propogateError(String msg, Throwable error) {
if (_log.shouldLog(Log.ERROR))
@ -629,8 +611,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
private void closeSocket() {
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Closing the socket", new Exception("closeSocket"));
_closed = true;
if (_reader != null) _reader.stopReading();
_reader = null;
if (_reader != null) {
_reader.stopReading();
_reader = null;
}
if (_writer != null) {
_writer.stopWriting();
_writer = null;
}
if (_socket != null) {
try {

View File

@ -79,6 +79,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
}
_writer = new ClientWriterRunner(_out, this);
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);
_reader.startReading();

View File

@ -21,6 +21,11 @@ import net.i2p.I2PAppContext;
* A simple in-JVM ServerSocket using Piped Streams.
* We use port numbers just like regular sockets.
* Can only be connected by InternalSocket.
*
* Warning - this uses Piped Streams, which don't like multiple writers from threads
* that may vanish. If you do use multipe writers,
* you may get intermittent 'write end dead' or 'pipe broken' IOExceptions on the reader side.
* See http://techtavern.wordpress.com/2008/07/16/whats-this-ioexception-write-end-dead/
*/
public class InternalServerSocket extends ServerSocket {
private static final ConcurrentHashMap<Integer, InternalServerSocket> _sockets = new ConcurrentHashMap(4);

View File

@ -34,12 +34,12 @@ public class InternalSocket extends Socket {
* Convenience method to return either a Socket or an InternalSocket
*/
public static Socket getSocket(String host, int port) throws IOException {
//if (System.getProperty("router.version") != null &&
// (host.equals("127.0.0.1") || host.equals("localhost"))) {
// return new InternalSocket(port);
//} else {
if (System.getProperty("router.version") != null &&
(host.equals("127.0.0.1") || host.equals("localhost"))) {
return new InternalSocket(port);
} else {
return new Socket(host, port);
//}
}
}
@Override