merge of '4002ce96746459cd6ab6f91f16795bdbe3165644'

and 'db4aaff4718328041f29e6166333139f845406cd'
This commit is contained in:
dev
2010-06-03 23:13:35 +00:00

View File

@ -13,9 +13,6 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
@ -35,10 +32,10 @@ public class EventPumper implements Runnable {
private volatile boolean _alive;
private Selector _selector;
private final LinkedBlockingQueue<ByteBuffer> _bufCache;
private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue();
private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue();
private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue();
private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue();
private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue<NTCPConnection>();
private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue<NTCPConnection>();
private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue<ServerSocketChannel>();
private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue<NTCPConnection>();
private NTCPTransport _transport;
private long _expireIdleWriteTime;
@ -61,7 +58,7 @@ public class EventPumper implements Runnable {
_log = ctx.logManager().getLog(getClass());
_transport = transport;
_alive = false;
_bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE);
_bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
}
@ -107,10 +104,9 @@ public class EventPumper implements Runnable {
public void run() {
long lastFailsafeIteration = System.currentTimeMillis();
List bufList = new ArrayList(16);
while (_alive && _selector.isOpen()) {
try {
runDelayedEvents(bufList);
runDelayedEvents();
int count = 0;
try {
//if (_log.shouldLog(Log.DEBUG))
@ -125,7 +121,7 @@ public class EventPumper implements Runnable {
if (_log.shouldLog(Log.DEBUG))
_log.debug("select returned " + count);
Set selected = null;
Set<SelectionKey> selected = null;
try {
selected = _selector.selectedKeys();
} catch (ClosedSelectorException cse) {
@ -142,7 +138,7 @@ public class EventPumper implements Runnable {
// properly marked as such, etc
lastFailsafeIteration = System.currentTimeMillis();
try {
Set all = _selector.keys();
Set<SelectionKey> all = _selector.keys();
int failsafeWrites = 0;
int failsafeCloses = 0;
@ -153,9 +149,8 @@ public class EventPumper implements Runnable {
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);
for (Iterator iter = all.iterator(); iter.hasNext(); ) {
for (SelectionKey key : all) {
try {
SelectionKey key = (SelectionKey)iter.next();
Object att = key.attachment();
if (!(att instanceof NTCPConnection))
continue; // to the next con
@ -225,9 +220,8 @@ public class EventPumper implements Runnable {
if (_selector.isOpen()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closing down the event pumper with selection keys remaining");
Set keys = _selector.keys();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = (SelectionKey)iter.next();
Set<SelectionKey> keys = _selector.keys();
for (SelectionKey key : keys) {
try {
Object att = key.attachment();
if (att instanceof ServerSocketChannel) {
@ -257,10 +251,9 @@ public class EventPumper implements Runnable {
_wantsWrite.clear();
}
private void processKeys(Set selected) {
for (Iterator iter = selected.iterator(); iter.hasNext(); ) {
private void processKeys(Set<SelectionKey> selected) {
for (SelectionKey key : selected) {
try {
SelectionKey key = (SelectionKey)iter.next();
int ops = key.readyOps();
boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0;
boolean connect = (ops & SelectionKey.OP_CONNECT) != 0;
@ -346,8 +339,7 @@ public class EventPumper implements Runnable {
private static int __liveBufs = 0;
private static int __consecutiveExtra;
ByteBuffer acquireBuf() {
//if (false) return ByteBuffer.allocate(BUF_SIZE);
ByteBuffer rv = (ByteBuffer)_bufCache.poll();
ByteBuffer rv = _bufCache.poll();
if (rv == null) {
rv = ByteBuffer.allocate(BUF_SIZE);
NUM_BUFS = ++__liveBufs;
@ -470,6 +462,7 @@ public class EventPumper implements Runnable {
buf.flip();
buf.get(data);
releaseBuf(buf);
buf=null;
ByteBuffer rbuf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
if (req.getPendingInboundRequested() > 0) {
@ -567,7 +560,7 @@ public class EventPumper implements Runnable {
+ " after " + (after-before));
}
private void runDelayedEvents(List buf) {
private void runDelayedEvents() {
NTCPConnection con;
while ((con = _wantsRead.poll()) != null) {
SelectionKey key = con.getKey();