* moved the inbound partial messages to the PeerState itself, reducing lock contention in the InboundMessageFragments and transparently dropping failed messages when we drop old peer states

This commit is contained in:
jrandom
2005-07-07 22:27:44 +00:00
committed by zzz
parent 76e8631e31
commit ad47bf5da3
5 changed files with 129 additions and 108 deletions

View File

@ -91,7 +91,7 @@ public class ACKSender implements Runnable {
if (peer != null) {
long lastSend = peer.getLastACKSend();
long wanted = peer.getWantedACKSendSince();
List ackBitfields = peer.retrieveACKBitfields(_transport.getPartialACKSource());
List ackBitfields = peer.retrieveACKBitfields();
if (wanted < 0)
_log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);

View File

@ -20,16 +20,10 @@ import net.i2p.util.Log;
* up in the router we have full blown replay detection, its nice to have a
* basic line of defense here).
*
* TODO: add in some sensible code to drop expired fragments from peers we
* don't hear from again (either a periodic culling for expired peers, or
* a scheduled event)
*
*/
public class InboundMessageFragments implements UDPTransport.PartialACKSource {
public class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
private RouterContext _context;
private Log _log;
/** Map of peer (Hash) to a Map of messageId (Long) to InboundMessageState objects */
private Map _inboundMessages;
/** list of message IDs recently received, so we can ignore in flight dups */
private DecayingBloomFilter _recentlyCompletedMessages;
private OutboundMessageFragments _outbound;
@ -44,7 +38,7 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(InboundMessageFragments.class);
_inboundMessages = new HashMap(64);
//_inboundMessages = new HashMap(64);
_outbound = outbound;
_transport = transport;
_ackSender = new ACKSender(_context, _transport);
@ -73,9 +67,6 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
_recentlyCompletedMessages = null;
_ackSender.shutdown();
_messageReceiver.shutdown();
synchronized (_inboundMessages) {
_inboundMessages.clear();
}
}
public boolean isAlive() { return _alive; }
@ -103,72 +94,76 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
private void receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
int fragments = data.readFragmentCount();
if (fragments <= 0) return;
synchronized (_inboundMessages) { // XXX: CHOKE POINT (to what extent?)
Map messages = (Map)_inboundMessages.get(from.getRemotePeer());
if (messages == null) {
messages = new HashMap(fragments);
_inboundMessages.put(from.getRemotePeer(), messages);
Hash fromPeer = from.getRemotePeer();
Map messages = from.getInboundMessages();
for (int i = 0; i < fragments; i++) {
Long messageId = new Long(data.readMessageId(i));
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId, -1);
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
+ _recentlyCompletedMessages.getInsertedCount());
continue;
}
for (int i = 0; i < fragments; i++) {
Long messageId = new Long(data.readMessageId(i));
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId, -1);
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
+ _recentlyCompletedMessages.getInsertedCount());
continue;
}
int size = data.readMessageFragmentSize(i);
InboundMessageState state = null;
boolean messageComplete = false;
boolean messageExpired = false;
boolean fragmentOK = false;
int size = data.readMessageFragmentSize(i);
InboundMessageState state = null;
boolean messageComplete = false;
boolean messageExpired = false;
boolean fragmentOK = false;
boolean partialACK = false;
// perhaps compact the synchronized block further by synchronizing on the
// particular state once its found?
synchronized (messages) {
state = (InboundMessageState)messages.get(messageId);
if (state == null) {
state = new InboundMessageState(_context, messageId.longValue(), from.getRemotePeer());
state = new InboundMessageState(_context, messageId.longValue(), fromPeer);
messages.put(messageId, state);
}
fragmentOK = state.receiveFragment(data, i);
if (state.isComplete()) {
messageComplete = true;
messages.remove(messageId);
if (messages.size() <= 0)
_inboundMessages.remove(from.getRemotePeer());
_recentlyCompletedMessages.add(messageId.longValue());
} else if (state.isExpired()) {
messageExpired = true;
messages.remove(messageId);
} else {
partialACK = true;
}
if (messageComplete) {
_recentlyCompletedMessages.add(messageId.longValue());
_messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))
_log.info("Message received completely! " + state);
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
} else if (state.isExpired()) {
messageExpired = true;
messages.remove(messageId);
if (messages.size() <= 0)
_inboundMessages.remove(from.getRemotePeer());
} else if (messageExpired) {
state.releaseResources();
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
state.releaseResources();
} else {
} else if (partialACK) {
// not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
from.messagePartiallyReceived();
_ackSender.ackPeer(from);
}
if (!fragmentOK)
break;
}
@ -209,23 +204,4 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
else
from.dataReceived();
}
public void fetchPartialACKs(Hash fromPeer, List ackBitfields) {
synchronized (_inboundMessages) {
Map messages = (Map)_inboundMessages.get(fromPeer);
if (messages == null)
return;
for (Iterator iter = messages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = (InboundMessageState)iter.next();
if (state.isExpired()) {
iter.remove();
} else {
if (!state.isComplete())
ackBitfields.add(state.createACKBitfield());
}
}
if (messages.size() <= 0)
_inboundMessages.remove(fromPeer);
}
}
}

View File

@ -1,7 +1,10 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -144,6 +147,9 @@ public class PeerState {
private long _packetsReceivedDuplicate;
private long _packetsReceived;
/** Message (Long) to InboundMessageState for active message */
private Map _inboundMessages;
private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
@ -202,6 +208,7 @@ public class PeerState {
_retransmissionPeriodStart = 0;
_packetsReceived = 0;
_packetsReceivedDuplicate = 0;
_inboundMessages = new HashMap(8);
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
@ -456,6 +463,12 @@ public class PeerState {
_wantACKSendSince = _context.clock().now();
}
/**
* Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages.
* Access to this map must be synchronized explicitly!
*/
public Map getInboundMessages() { return _inboundMessages; }
/**
* either they told us to back off, or we had to resend to get
* the data through.
@ -489,7 +502,7 @@ public class PeerState {
* will be unchanged if there are ACKs remaining.
*
*/
public List retrieveACKBitfields(UDPTransport.PartialACKSource partialACKSource) {
public List retrieveACKBitfields() {
List rv = new ArrayList(16);
int bytesRemaining = countMaxACKData();
synchronized (_currentACKs) {
@ -502,12 +515,12 @@ public class PeerState {
}
int partialIncluded = 0;
if ( (bytesRemaining > 4) && (partialACKSource != null) ) {
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
// we should try to find some packets to partially ACK
// (preferably the ones which have the most received fragments)
List partial = new ArrayList();
partialACKSource.fetchPartialACKs(_remotePeer, partial);
fetchPartialACKs(partial);
// we may not be able to use them all, but lets try...
for (int i = 0; (bytesRemaining > 4) && (i < partial.size()); i++) {
ACKBitfield bitfield = (ACKBitfield)partial.get(i);
@ -528,6 +541,34 @@ public class PeerState {
return rv;
}
private void fetchPartialACKs(List rv) {
InboundMessageState states[] = null;
int curState = 0;
synchronized (_inboundMessages) {
int numMessages = _inboundMessages.size();
if (numMessages <= 0)
return;
for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = (InboundMessageState)iter.next();
if (state.isExpired()) {
iter.remove();
} else {
if (!state.isComplete()) {
if (states == null)
states = new InboundMessageState[numMessages];
states[curState++] = state;
}
}
}
}
if (states != null) {
for (int i = curState-1; i >= 0; i--) {
if (states[i] != null)
rv.add(states[i].createACKBitfield());
}
}
}
/** represent a full ACK of a message */
private class FullACKBitfield implements ACKBitfield {
private long _msgId;

View File

@ -59,6 +59,7 @@ class UDPFlooder implements Runnable {
}
public void run() {
long nextSend = _context.clock().now();
while (_alive) {
try {
synchronized (_peers) {
@ -67,33 +68,47 @@ class UDPFlooder implements Runnable {
}
} catch (InterruptedException ie) {}
// peers always grows, so this is fairly safe
for (int i = 0; i < _peers.size(); i++) {
PeerState peer = (PeerState)_peers.get(i);
DataMessage m = new DataMessage(_context);
byte data[] = _floodData; // new byte[4096];
//_context.random().nextBytes(data);
m.setData(data);
m.setMessageExpiration(_context.clock().now() + 10*1000);
m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
if (true) {
OutNetMessage msg = new OutNetMessage(_context);
msg.setMessage(m);
msg.setExpiration(m.getMessageExpiration());
msg.setPriority(500);
RouterInfo to = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
if (to == null)
continue;
msg.setTarget(to);
_context.statManager().getStatLog().addData(peer.getRemotePeer().toBase64().substring(0,6), "udp.floodDataSent", 1, 0);
long now = _context.clock().now();
if (now >= nextSend) {
// peers always grows, so this is fairly safe
for (int i = 0; i < _peers.size(); i++) {
PeerState peer = (PeerState)_peers.get(i);
DataMessage m = new DataMessage(_context);
byte data[] = _floodData; // new byte[4096];
//_context.random().nextBytes(data);
m.setData(data);
m.setMessageExpiration(_context.clock().now() + 10*1000);
m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
if (true) {
OutNetMessage msg = new OutNetMessage(_context);
msg.setMessage(m);
msg.setExpiration(m.getMessageExpiration());
msg.setPriority(500);
RouterInfo to = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
if (to == null)
continue;
msg.setTarget(to);
_context.statManager().getStatLog().addData(peer.getRemotePeer().toBase64().substring(0,6), "udp.floodDataSent", 1, 0);
_transport.send(msg);
} else {
_transport.send(m, peer);
_transport.send(msg);
} else {
_transport.send(m, peer);
}
}
nextSend = now + calcFloodDelay();
}
long delay = nextSend - now;
if (delay > 0) {
if (delay > 10*1000) {
long fd = calcFloodDelay();
if (delay > fd) {
nextSend = now + fd;
delay = fd;
}
}
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
}
long floodDelay = calcFloodDelay();
try { Thread.sleep(floodDelay); } catch (InterruptedException ie) {}
}
}

View File

@ -666,17 +666,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
out.write("</table>\n");
}
public PartialACKSource getPartialACKSource() { return _inboundFragments; }
/** help us grab partial ACKs */
public interface PartialACKSource {
/**
* build partial ACKs of messages received from the peer and store
* them in the ackBitfields
*/
public void fetchPartialACKs(Hash fromPeer, List ackBitfields);
}
private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00");
private static final String formatKBps(int bps) {