* NTCP: Use a java.util.concurrent execution queue instead of

SimpleTimer for afterSend() to reduce lock contention
This commit is contained in:
zzz
2009-01-29 21:13:24 +00:00
parent 69e6393442
commit 4aa9c7fdcf
2 changed files with 95 additions and 23 deletions

View File

@ -0,0 +1,89 @@
package net.i2p.router.transport.ntcp;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
import net.i2p.I2PAppContext;
import net.i2p.router.OutNetMessage;
import net.i2p.util.Log;
/**
* Previously, NTCP was using SimpleTimer with a delay of 0, which
* was a real abuse.
*
* Here we use the non-scheduled, lockless ThreadPoolExecutor with
* a fixed pool size and an unbounded queue.
*
* The old implementation was having problems with lock contention;
* this should work a lot better - and not clog up the SimpleTimer queue.
*
* @author zzz
*/
public class NTCPSendFinisher {
private static final int THREADS = 4;
private I2PAppContext _context;
private NTCPTransport _transport;
private Log _log;
private int _count;
private ThreadPoolExecutor _executor;
public NTCPSendFinisher(I2PAppContext context, NTCPTransport transport) {
_context = context;
_log = _context.logManager().getLog(NTCPSendFinisher.class);
_transport = transport;
}
public void start() {
_count = 0;
_executor = new CustomThreadPoolExecutor();
}
public void stop() {
_executor.shutdownNow();
}
public void add(OutNetMessage msg) {
_executor.execute(new RunnableEvent(msg));
}
// not really needed for now but in case we want to add some hooks like afterExecute()
private class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor() {
// use unbounded queue, so maximumPoolSize and keepAliveTime have no effect
super(THREADS, THREADS, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(), new CustomThreadFactory());
}
}
private class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName("NTCPSendFinisher " + (++_count) + '/' + THREADS);
rv.setDaemon(true);
return rv;
}
}
/**
* Call afterSend() for the message
*/
private class RunnableEvent implements Runnable {
private OutNetMessage _msg;
public RunnableEvent(OutNetMessage msg) {
_msg = msg;
}
public void run() {
try {
_transport.afterSend(_msg, true, false, _msg.getSendTime());
} catch (Throwable t) {
_log.log(Log.CRIT, " wtf, afterSend borked", t);
}
}
}
}

View File

@ -27,7 +27,6 @@ import net.i2p.router.transport.Transport;
import net.i2p.router.transport.TransportBid;
import net.i2p.router.transport.TransportImpl;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
*
@ -50,7 +49,7 @@ public class NTCPTransport extends TransportImpl {
private List _establishing;
private List _sent;
private SendFinisher _finisher;
private NTCPSendFinisher _finisher;
public NTCPTransport(RouterContext ctx) {
super(ctx);
@ -124,7 +123,7 @@ public class NTCPTransport extends TransportImpl {
_conByIdent = new HashMap(64);
_sent = new ArrayList(4);
_finisher = new SendFinisher();
_finisher = new NTCPSendFinisher(ctx, this);
_pumper = new EventPumper(ctx, this);
_reader = new Reader(ctx);
@ -310,27 +309,8 @@ public class NTCPTransport extends TransportImpl {
return countActivePeers() < getMaxConnections() * 4 / 5;
}
/** queue up afterSend call, which can take some time w/ jobs, etc */
void sendComplete(OutNetMessage msg) { _finisher.add(msg); }
/** async afterSend call, which can take some time w/ jobs, etc */
private class SendFinisher implements SimpleTimer.TimedEvent {
public void add(OutNetMessage msg) {
synchronized (_sent) { _sent.add(msg); }
SimpleTimer.getInstance().addEvent(SendFinisher.this, 0);
}
public void timeReached() {
int pending = 0;
OutNetMessage msg = null;
synchronized (_sent) {
pending = _sent.size()-1;
if (pending >= 0)
msg = (OutNetMessage)_sent.remove(0);
}
if (msg != null)
afterSend(msg, true, false, msg.getSendTime());
if (pending > 0)
SimpleTimer.getInstance().addEvent(SendFinisher.this, 0);
}
}
private boolean isEstablished(RouterIdentity peer) {
return isEstablished(peer.calculateHash());
@ -412,6 +392,7 @@ public class NTCPTransport extends TransportImpl {
public RouterAddress startListening() {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
@ -423,6 +404,7 @@ public class NTCPTransport extends TransportImpl {
public RouterAddress restartListening(RouterAddress addr) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Restarting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
@ -551,6 +533,7 @@ public class NTCPTransport extends TransportImpl {
_pumper.stopPumping();
_writer.stopWriting();
_reader.stopReading();
_finisher.stop();
Map cons = null;
synchronized (_conLock) {
cons = new HashMap(_conByIdent);