* Streaming:

- Detect and drop dup SYNs rather than create
        a duplicate connection - will hopefully fix
        "Received a syn with the wrong IDs"
      - Send reset for a SYN ACK with the wrong IDs
      - Don't send a reset to a null dest
      - Logging tweaks
      - Cleanups
This commit is contained in:
zzz
2010-04-07 23:18:58 +00:00
parent 2a92be5946
commit e254c5f31a
7 changed files with 79 additions and 27 deletions

View File

@ -385,7 +385,7 @@ public class Connection {
* Process the acks and nacks received in a packet
* @return List of packets acked or null
*/
List ackPackets(long ackThrough, long nacks[]) {
List<PacketLocal> ackPackets(long ackThrough, long nacks[]) {
if (ackThrough < _highestAckedThrough) {
// dupack which won't tell us anything
} else {

View File

@ -2,7 +2,6 @@ package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
@ -187,7 +186,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
packet.setOptionalFrom(con.getSession().getMyDestination());
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
}
if (DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) {
if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
packet.setFlag(Packet.FLAG_NO_ACK);
}

View File

@ -4,6 +4,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
@ -73,8 +74,8 @@ public class ConnectionHandler {
sendReset(packet);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
if (_log.shouldLog(Log.INFO))
_log.info("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
// also check if expiration of the head is long past for overload detection with peek() ?
boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) {
@ -145,10 +146,29 @@ public class ConnectionHandler {
if (syn.getOptionalDelay() == PoisonPacket.POISON_MAX_DELAY_REQUEST)
return null;
// deal with forged / invalid syn packets
// deal with forged / invalid syn packets in _manager.receiveConnection()
// Handle both SYN and non-SYN packets in the queue
if (syn.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
// We are single-threaded here, so this is
// a good place to check for dup SYNs and drop them
Destination from = syn.getOptionalFrom();
if (from == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping SYN packet with no FROM: " + syn);
// drop it
continue;
}
Connection oldcon = _manager.getConnectionByOutboundId(syn.getReceiveStreamId());
if (oldcon != null) {
// His ID not guaranteed to be unique to us, but probably is...
// only drop it on a destination match too
if (from.equals(oldcon.getRemotePeer())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping dup SYN: " + syn);
continue;
}
}
Connection con = _manager.receiveConnection(syn);
if (con != null)
return con;

View File

@ -9,7 +9,6 @@ import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
@ -83,7 +82,7 @@ public class ConnectionManager {
*/
Connection getConnectionByOutboundId(long id) {
for (Connection con : _connectionByInboundId.values()) {
if (DataHelper.eq(con.getSendStreamId(), id))
if (con.getSendStreamId() == id)
return con;
}
return null;
@ -169,6 +168,7 @@ public class ConnectionManager {
con.setReceiveStreamId(receiveId);
try {
// This validates the packet, and sets the con's SendStreamID and RemotePeer
con.getPacketHandler().receivePacket(synPacket, con);
} catch (I2PException ie) {
_connectionByInboundId.remove(Long.valueOf(receiveId));

View File

@ -359,7 +359,11 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (_rtt > 60*1000)
_rtt = 60*1000;
}
public int getRTO() { return _rto; }
/** for debugging @since 0.7.13 */
int getRTTDev() { return _rttDev; }
/**
* If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have

View File

@ -4,7 +4,6 @@ import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
@ -239,17 +238,17 @@ public class ConnectionPacketHandler {
boolean firstAck = isNew && con.getHighestAckedThrough() < 0;
int numResends = 0;
List acked = null;
List<PacketLocal> acked = null;
// if we don't know the streamIds for both sides of the connection, there's no way we
// could actually be acking data (this fixes the buggered up ack of packet 0 problem).
// this is called after packet verification, which places the stream IDs as necessary if
// the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet)
if ( (packet != null) && (packet.getSendStreamId() > 0) && (packet.getReceiveStreamId() > 0) &&
(con != null) && (con.getSendStreamId() > 0) && (con.getReceiveStreamId() > 0) &&
(!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) )
(packet.getSendStreamId() != Packet.STREAM_ID_UNKNOWN) &&
(packet.getReceiveStreamId() != Packet.STREAM_ID_UNKNOWN) &&
(con.getSendStreamId() != Packet.STREAM_ID_UNKNOWN) &&
(con.getReceiveStreamId() != Packet.STREAM_ID_UNKNOWN) )
acked = con.ackPackets(ackThrough, nacks);
else
return false;
@ -261,7 +260,7 @@ public class ConnectionPacketHandler {
// and the highest rtt lets us set our resend delay properly
int highestRTT = -1;
for (int i = 0; i < acked.size(); i++) {
PacketLocal p = (PacketLocal)acked.get(i);
PacketLocal p = acked.get(i);
if (p.getAckTime() > highestRTT) {
//if (p.getNumSends() <= 1)
highestRTT = p.getAckTime();
@ -282,7 +281,15 @@ public class ConnectionPacketHandler {
_log.debug("Packet acked after " + p.getAckTime() + "ms: " + p);
}
if (highestRTT > 0) {
int oldrtt = con.getOptions().getRTT();
int oldrto = con.getOptions().getRTO();
int olddev = con.getOptions().getRTTDev();
con.getOptions().updateRTT(highestRTT);
if (_log.shouldLog(Log.INFO))
_log.info("acked: " + acked.size() + " highestRTT: " + highestRTT +
" RTT: " + oldrtt + " -> " + con.getOptions().getRTT() +
" RTO: " + oldrto + " -> " + con.getOptions().getRTO() +
" Dev: " + olddev + " -> " + con.getOptions().getRTTDev());
if (firstAck) {
if (con.isInbound())
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0);
@ -357,7 +364,7 @@ public class ConnectionPacketHandler {
// integers, so lets use a random distribution instead
int shouldIncrement = _context.random().nextInt(con.getOptions().getCongestionAvoidanceGrowthRateFactor()*newWindowSize);
if (shouldIncrement < acked)
newWindowSize += 1;
newWindowSize++;
if (_log.shouldLog(Log.DEBUG))
_log.debug("cong. avoid acks = " + acked + " for " + con);
}
@ -387,6 +394,11 @@ public class ConnectionPacketHandler {
/**
* Make sure this packet is ok and that we can continue processing its data.
*
* SIDE EFFECT:
* Sets the SendStreamId and RemotePeer for the con,
* using the packet's ReceiveStreamId and OptionalFrom,
* If this is a SYN packet and the con's SendStreamId is not set.
*
* @return true if the packet is ok for this connection, false if we shouldn't
* continue processing.
@ -415,7 +427,7 @@ public class ConnectionPacketHandler {
}
}
} else {
if (!DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) {
if (con.getSendStreamId() != packet.getReceiveStreamId()) {
if (_log.shouldLog(Log.ERROR))
_log.error("Packet received with the wrong reply stream id: "
+ con + " / " + packet);
@ -431,7 +443,7 @@ public class ConnectionPacketHandler {
* Make sure this RST packet is valid, and if it is, act on it.
*/
private void verifyReset(Packet packet, Connection con) {
if (DataHelper.eq(con.getReceiveStreamId(), packet.getSendStreamId())) {
if (con.getReceiveStreamId() == packet.getSendStreamId()) {
boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null);
if (!ok) {
if (_log.shouldLog(Log.ERROR))

View File

@ -7,7 +7,7 @@ import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
/**
@ -19,15 +19,15 @@ public class PacketHandler {
private ConnectionManager _manager;
private I2PAppContext _context;
private Log _log;
private int _lastDelay;
private int _dropped;
//private int _lastDelay;
//private int _dropped;
public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) {
_manager = mgr;
_context = ctx;
_dropped = 0;
//_dropped = 0;
_log = ctx.logManager().getLog(PacketHandler.class);
_lastDelay = _context.random().nextInt(30*1000);
//_lastDelay = _context.random().nextInt(30*1000);
}
/** what is the point of this ? */
@ -167,7 +167,7 @@ public class PacketHandler {
}
} else {
if ( (con.getSendStreamId() <= 0) ||
(DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ||
(con.getSendStreamId() == packet.getReceiveStreamId()) ||
(packet.getSequenceNum() <= ConnectionOptions.MIN_WINDOW_SIZE) ) { // its in flight from the first batch
long oldId = con.getSendStreamId();
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
@ -179,6 +179,7 @@ public class PacketHandler {
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Received a syn with the wrong IDs, con=" + con + " packet=" + packet);
sendReset(packet);
packet.releasePayload();
return;
}
@ -199,16 +200,18 @@ public class PacketHandler {
} else {
if (!con.getResetSent()) {
// someone is sending us a packet on the wrong stream
// It isn't a SYN so it isn't likely to have a FROM to send a reset back to
if (_log.shouldLog(Log.ERROR)) {
Set cons = _manager.listConnections();
StringBuilder buf = new StringBuilder(512);
buf.append("Received a packet on the wrong stream: ");
buf.append(packet);
buf.append(" connection: ");
buf.append("\nthis connection:\n");
buf.append(con);
buf.append("\nall connections:");
for (Iterator iter = cons.iterator(); iter.hasNext();) {
Connection cur = (Connection)iter.next();
buf.append(" ").append(cur);
buf.append('\n').append(cur);
}
_log.error(buf.toString(), new Exception("Wrong stream"));
}
@ -219,8 +222,22 @@ public class PacketHandler {
}
}
/**
* This sends a reset back to the place this packet came from.
* If the packet has no 'optional from' or valid signature, this does nothing.
* This is not associated with a connection, so no con stats are updated.
*/
private void sendReset(Packet packet) {
PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom());
Destination from = packet.getOptionalFrom();
if (from == null)
return;
boolean ok = packet.verifySignature(_context, from, null);
if (!ok) {
if (_log.shouldLog(Log.WARN))
_log.warn("Can't send reset after recv spoofed packet: " + packet);
return;
}
PacketLocal reply = new PacketLocal(_context, from);
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(packet.getReceiveStreamId());