concurrent

This commit is contained in:
zzz
2010-03-18 12:32:01 +00:00
parent 3eef403b04
commit 2e8fd23f2b
3 changed files with 79 additions and 111 deletions

View File

@ -6,6 +6,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
@ -29,7 +30,7 @@ public class Connection {
private long _sendStreamId;
private long _receiveStreamId;
private long _lastSendTime;
private long _lastSendId;
private AtomicLong _lastSendId;
private boolean _resetReceived;
private boolean _resetSent;
private long _resetSentOn;
@ -49,7 +50,7 @@ public class Connection {
private boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
private final Map _outboundPackets;
private final Map<Long, PacketLocal> _outboundPackets;
private PacketQueue _outboundQueue;
private ConnectionPacketHandler _handler;
private ConnectionOptions _options;
@ -102,7 +103,7 @@ public class Connection {
_options = (opts != null ? opts : new ConnectionOptions());
_outputStream.setWriteTimeout((int)_options.getWriteTimeout());
_inputStream.setReadTimeout((int)_options.getReadTimeout());
_lastSendId = -1;
_lastSendId = new AtomicLong(-1);
_nextSendTime = -1;
_ackedPackets = 0;
_createdOn = _context.clock().now();
@ -137,9 +138,7 @@ public class Connection {
}
public long getNextOutboundPacketNum() {
synchronized (this) {
return ++_lastSendId;
}
return _lastSendId.incrementAndGet();
}
void closeReceived() {
@ -175,7 +174,7 @@ public class Connection {
return false;
started = true;
if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ||
(_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) {
(_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) {
if (timeoutMs > 0) {
if (timeLeft <= 0) {
if (_log.shouldLog(Log.INFO))
@ -211,10 +210,10 @@ public class Connection {
void ackImmediately() {
PacketLocal packet = null;
synchronized (_outboundPackets) {
if (_outboundPackets.size() > 0) {
if (!_outboundPackets.isEmpty()) {
// ordered, so pick the lowest to retransmit
Iterator iter = _outboundPackets.values().iterator();
packet = (PacketLocal)iter.next();
Iterator<PacketLocal> iter = _outboundPackets.values().iterator();
packet = iter.next();
//iter.remove();
}
}
@ -403,10 +402,10 @@ public class Connection {
}
}
List acked = null;
List<PacketLocal> acked = null;
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
for (Iterator<Long> iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
Long id = iter.next();
if (id.longValue() <= ackThrough) {
boolean nacked = false;
if (nacks != null) {
@ -414,7 +413,7 @@ public class Connection {
for (int i = 0; i < nacks.length; i++) {
if (nacks[i] == id.longValue()) {
nacked = true;
PacketLocal nackedPacket = (PacketLocal)_outboundPackets.get(id);
PacketLocal nackedPacket = _outboundPackets.get(id);
nackedPacket.incrementNACKs();
break; // NACKed
}
@ -423,7 +422,7 @@ public class Connection {
if (!nacked) { // aka ACKed
if (acked == null)
acked = new ArrayList(1);
PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id);
PacketLocal ackedPacket = _outboundPackets.get(id);
ackedPacket.ackReceived();
acked.add(ackedPacket);
}
@ -433,7 +432,7 @@ public class Connection {
}
if (acked != null) {
for (int i = 0; i < acked.size(); i++) {
PacketLocal p = (PacketLocal)acked.get(i);
PacketLocal p = acked.get(i);
_outboundPackets.remove(new Long(p.getSequenceNum()));
_ackedPackets++;
if (p.getNumSends() > 1) {
@ -443,7 +442,7 @@ public class Connection {
}
}
}
if ( (_outboundPackets.size() <= 0) && (_activeResends != 0) ) {
if ( (_outboundPackets.isEmpty()) && (_activeResends != 0) ) {
if (_log.shouldLog(Log.INFO))
_log.info("All outbound packets acked, clearing " + _activeResends);
_activeResends = 0;
@ -570,8 +569,8 @@ public class Connection {
private void killOutstandingPackets() {
//boolean tagsCancelled = false;
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = (PacketLocal)iter.next();
for (Iterator<PacketLocal> iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = iter.next();
//if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
// tagsCancelled = true;
pl.cancelled();
@ -652,11 +651,11 @@ public class Connection {
/** What was the last packet Id sent to the peer?
* @return The last sent packet ID
*/
public long getLastSendId() { return _lastSendId; }
public long getLastSendId() { return _lastSendId.get(); }
/** Set the packet Id that was sent to a peer.
* @param id The packet ID
*/
public void setLastSendId(long id) { _lastSendId = id; }
public void setLastSendId(long id) { _lastSendId.set(id); }
/**
* Retrieve the current ConnectionOptions.
@ -783,7 +782,7 @@ public class Connection {
if (_ackSinceCongestion) {
_lastCongestionSeenAt = _options.getWindowSize();
_lastCongestionTime = _context.clock().now();
_lastCongestionHighestUnacked = _lastSendId;
_lastCongestionHighestUnacked = _lastSendId.get();
_ackSinceCongestion = false;
}
}
@ -1022,7 +1021,7 @@ public class Connection {
}
if (getCloseReceivedOn() > 0)
buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
buf.append(" sent: ").append(1 + _lastSendId);
buf.append(" sent: ").append(1 + _lastSendId.get());
if (_inputStream != null)
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);

View File

@ -1,10 +1,10 @@
package net.i2p.client.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
@ -32,14 +32,13 @@ public class ConnectionManager {
private ConnectionPacketHandler _conPacketHandler;
private TCBShare _tcbShare;
/** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
private ConcurrentHashMap<Long, Connection> _connectionByInboundId;
/** Ping ID (Long) to PingRequest */
private final Map _pendingPings;
private final Map<Long, PingRequest> _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private final Object _connectionLock;
private long SoTimeout;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
@ -48,9 +47,8 @@ public class ConnectionManager {
_maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions;
_log = _context.logManager().getLog(ConnectionManager.class);
_connectionByInboundId = new HashMap(32);
_pendingPings = new HashMap(4);
_connectionLock = new Object();
_connectionByInboundId = new ConcurrentHashMap(32);
_pendingPings = new ConcurrentHashMap(4);
_messageHandler = new MessageHandler(_context, this);
_packetHandler = new PacketHandler(_context, this);
_connectionHandler = new ConnectionHandler(_context, this);
@ -77,22 +75,17 @@ public class ConnectionManager {
}
Connection getConnectionByInboundId(long id) {
synchronized (_connectionLock) {
return (Connection)_connectionByInboundId.get(new Long(id));
}
return _connectionByInboundId.get(Long.valueOf(id));
}
/**
* not guaranteed to be unique, but in case we receive more than one packet
* on an inbound connection that we havent ack'ed yet...
*/
Connection getConnectionByOutboundId(long id) {
synchronized (_connectionLock) {
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
for (Connection con : _connectionByInboundId.values()) {
if (DataHelper.eq(con.getSendStreamId(), id))
return con;
}
}
return null;
}
@ -135,27 +128,26 @@ public class ConnectionManager {
boolean reject = false;
int active = 0;
int total = 0;
synchronized (_connectionLock) {
total = _connectionByInboundId.size();
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
if ( ((Connection)iter.next()).getIsConnected() )
active++;
}
// just for the stat
//total = _connectionByInboundId.size();
//for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
// if ( ((Connection)iter.next()).getIsConnected() )
// active++;
//}
if (locked_tooManyStreams()) {
reject = true;
} else {
while (true) {
Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
Connection oldCon = _connectionByInboundId.putIfAbsent(Long.valueOf(receiveId), con);
if (oldCon == null) {
break;
} else {
_connectionByInboundId.put(new Long(receiveId), oldCon);
// receiveId already taken, try another
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
}
}
}
_context.statManager().addRateData("stream.receiveActive", active, total);
@ -179,9 +171,7 @@ public class ConnectionManager {
try {
con.getPacketHandler().receivePacket(synPacket, con);
} catch (I2PException ie) {
synchronized (_connectionLock) {
_connectionByInboundId.remove(new Long(receiveId));
}
_connectionByInboundId.remove(Long.valueOf(receiveId));
return null;
}
@ -215,8 +205,7 @@ public class ConnectionManager {
_numWaiting--;
return null;
}
boolean reject = false;
synchronized (_connectionLock) {
if (locked_tooManyStreams()) {
// allow a full buffer of pending/waiting streams
if (_numWaiting > _maxConcurrentStreams) {
@ -227,27 +216,30 @@ public class ConnectionManager {
_numWaiting--;
return null;
}
// no remaining streams, lets wait a bit
try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
// got rid of the lock, so just sleep (fixme?)
// try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
} else {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
while (_connectionByInboundId.containsKey(new Long(receiveId))) {
while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) {
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
_connectionByInboundId.put(new Long(receiveId), con);
_connectionByInboundId.put(Long.valueOf(receiveId), con);
break; // stop looping as a psuedo-wait
}
}
}
// ok we're in...
con.setReceiveStreamId(receiveId);
con.eventOccurred();
_log.debug("Connect() conDelay = " + opts.getConnectDelay());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connect() conDelay = " + opts.getConnectDelay());
if (opts.getConnectDelay() <= 0) {
con.waitForConnect();
}
@ -258,12 +250,15 @@ public class ConnectionManager {
return con;
}
/**
* Doesn't need to be locked any more
* @return too many
*/
private boolean locked_tooManyStreams() {
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
int active = 0;
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
for (Connection con : _connectionByInboundId.values()) {
if (con.getIsConnected())
active++;
}
@ -293,13 +288,10 @@ public class ConnectionManager {
*
*/
public void disconnectAllHard() {
synchronized (_connectionLock) {
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
con.disconnect(false, false);
}
_connectionByInboundId.clear();
_connectionLock.notifyAll();
for (Iterator<Connection> iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = iter.next();
con.disconnect(false, false);
iter.remove();
}
_tcbShare.stop();
}
@ -310,17 +302,15 @@ public class ConnectionManager {
* @param con Connection to drop.
*/
public void removeConnection(Connection con) {
boolean removed = false;
synchronized (_connectionLock) {
Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
removed = (o == con);
Object o = _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId()));
boolean removed = (o == con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection removed? " + removed + " remaining: "
+ _connectionByInboundId.size() + ": " + con);
if (!removed && _log.shouldLog(Log.DEBUG))
_log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
_connectionLock.notifyAll();
}
if (removed) {
_context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime());
MessageInputStream stream = con.getInputStream();
@ -344,10 +334,8 @@ public class ConnectionManager {
/** return a set of Connection objects
* @return set of Connection objects
*/
public Set listConnections() {
synchronized (_connectionLock) {
public Set<Connection> listConnections() {
return new HashSet(_connectionByInboundId.values());
}
}
/** blocking */
@ -368,7 +356,7 @@ public class ConnectionManager {
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, PingNotifier notifier) {
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
packet.setSendStreamId(id.longValue());
packet.setFlag(Packet.FLAG_ECHO);
@ -381,9 +369,7 @@ public class ConnectionManager {
PingRequest req = new PingRequest(peer, packet, notifier);
synchronized (_pendingPings) {
_pendingPings.put(id, req);
}
_pendingPings.put(id, req);
_outboundQueue.enqueue(packet);
packet.releasePayload();
@ -393,10 +379,7 @@ public class ConnectionManager {
if (!req.pongReceived())
try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
}
synchronized (_pendingPings) {
_pendingPings.remove(id);
}
_pendingPings.remove(id);
} else {
SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
}
@ -418,13 +401,8 @@ public class ConnectionManager {
}
public void timeReached() {
boolean removed = false;
synchronized (_pendingPings) {
Object o = _pendingPings.remove(_id);
if (o != null)
removed = true;
}
if (removed) {
PingRequest pr = _pendingPings.remove(_id);
if (pr != null) {
if (_notifier != null)
_notifier.pingComplete(false);
if (_log.shouldLog(Log.INFO))
@ -433,7 +411,7 @@ public class ConnectionManager {
}
}
private class PingRequest {
private static class PingRequest {
private boolean _ponged;
private Destination _peer;
private PacketLocal _packet;
@ -445,7 +423,8 @@ public class ConnectionManager {
_notifier = notifier;
}
public void pong() {
_log.debug("Ping successful");
// static, no log
//_log.debug("Ping successful");
//_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
synchronized (ConnectionManager.PingRequest.this) {
_ponged = true;
@ -458,10 +437,7 @@ public class ConnectionManager {
}
void receivePong(long pingId) {
PingRequest req = null;
synchronized (_pendingPings) {
req = (PingRequest)_pendingPings.remove(new Long(pingId));
}
PingRequest req = _pendingPings.remove(Long.valueOf(pingId));
if (req != null)
req.pong();
}

View File

@ -1,13 +1,14 @@
package net.i2p.client.streaming;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.util.Log;
import net.i2p.util.ConcurrentHashSet;
/**
* Receive raw information from the I2PSession and turn it into
@ -18,12 +19,12 @@ public class MessageHandler implements I2PSessionListener {
private ConnectionManager _manager;
private I2PAppContext _context;
private Log _log;
private final List _listeners;
private final Set<I2PSocketManager.DisconnectListener> _listeners;
public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) {
_manager = mgr;
_context = ctx;
_listeners = new ArrayList(1);
_listeners = new ConcurrentHashSet(1);
_log = ctx.logManager().getLog(MessageHandler.class);
_context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
}
@ -77,14 +78,10 @@ public class MessageHandler implements I2PSessionListener {
_log.warn("I2PSession disconnected");
_manager.disconnectAllHard();
List listeners = null;
synchronized (_listeners) {
listeners = new ArrayList(_listeners);
_listeners.clear();
}
for (int i = 0; i < listeners.size(); i++) {
I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i);
for (Iterator<I2PSocketManager.DisconnectListener> iter = _listeners.iterator(); iter.hasNext(); ) {
I2PSocketManager.DisconnectListener lsnr = iter.next();
lsnr.sessionDisconnected();
iter.remove();
}
}
@ -104,13 +101,9 @@ public class MessageHandler implements I2PSessionListener {
}
public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
synchronized (_listeners) {
_listeners.add(lsnr);
}
}
public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
synchronized (_listeners) {
_listeners.remove(lsnr);
}
}
}