2005-09-28 jrandom

* Fix for at least some (all?) of the wrong stream errors in the streaming
      lib
This commit is contained in:
jrandom
2005-09-28 09:17:54 +00:00
committed by zzz
parent ef7d1ba964
commit 900420719e
7 changed files with 100 additions and 57 deletions

View File

@ -256,10 +256,14 @@ public class Connection {
remaining = 0; remaining = 0;
if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) { if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
packet.setOptionalDelay(0); packet.setOptionalDelay(0);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
} else { } else {
int delay = _options.getRTO() / 2; int delay = _options.getRTO() / 2;
packet.setOptionalDelay(delay); packet.setOptionalDelay(delay);
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); if (delay > 0)
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
} }
packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
@ -516,17 +520,30 @@ public class Connection {
synchronized (_connectLock) { _connectLock.notifyAll(); } synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
private boolean _remotePeerSet = false;
/** who are we talking with */ /** who are we talking with */
public Destination getRemotePeer() { return _remotePeer; } public Destination getRemotePeer() { return _remotePeer; }
public void setRemotePeer(Destination peer) { _remotePeer = peer; } public void setRemotePeer(Destination peer) {
if (_remotePeerSet) throw new RuntimeException("Remote peer already set [" + _remotePeer + ", " + peer + "]");
_remotePeerSet = true;
_remotePeer = peer;
}
private boolean _sendStreamIdSet = false;
/** what stream do we send data to the peer on? */ /** what stream do we send data to the peer on? */
public long getSendStreamId() { return _sendStreamId; } public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) { _sendStreamId = id; } public void setSendStreamId(long id) {
if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
_sendStreamIdSet = true;
_sendStreamId = id;
}
private boolean _receiveStreamIdSet = false;
/** stream the peer sends data to us on. (may be null) */ /** stream the peer sends data to us on. (may be null) */
public long getReceiveStreamId() { return _receiveStreamId; } public long getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(long id) { public void setReceiveStreamId(long id) {
if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
_receiveStreamIdSet = true;
_receiveStreamId = id; _receiveStreamId = id;
synchronized (_connectLock) { _connectLock.notifyAll(); } synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
@ -909,11 +926,14 @@ public class Connection {
} }
// revamp various fields, in case we need to ack more, etc // revamp various fields, in case we need to ack more, etc
_inputStream.updateAcks(_packet); _inputStream.updateAcks(_packet);
_packet.setOptionalDelay(getOptions().getChoke()); int choke = getOptions().getChoke();
_packet.setOptionalDelay(choke);
if (choke > 0)
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); _packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
_packet.setResendDelay(getOptions().getResendDelay()); _packet.setResendDelay(getOptions().getResendDelay());
_packet.setReceiveStreamId(_receiveStreamId); //_packet.setReceiveStreamId(_receiveStreamId);
_packet.setSendStreamId(_sendStreamId); //_packet.setSendStreamId(_sendStreamId);
int newWindowSize = getOptions().getWindowSize(); int newWindowSize = getOptions().getWindowSize();

View File

@ -143,15 +143,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
data.setValid(size); data.setValid(size);
data.setOffset(0); data.setOffset(0);
packet.setPayload(data); packet.setPayload(data);
if ( (ackOnly && !forceIncrement) && (!isFirst) ) if ( (ackOnly && !forceIncrement) && (!isFirst) )
packet.setSequenceNum(0); packet.setSequenceNum(0);
else else
packet.setSequenceNum(con.getNextOutboundPacketNum()); packet.setSequenceNum(con.getNextOutboundPacketNum());
packet.setSendStreamId(con.getSendStreamId()); packet.setSendStreamId(con.getSendStreamId());
packet.setReceiveStreamId(con.getReceiveStreamId()); packet.setReceiveStreamId(con.getReceiveStreamId());
con.getInputStream().updateAcks(packet); con.getInputStream().updateAcks(packet);
packet.setOptionalDelay(con.getOptions().getChoke()); int choke = con.getOptions().getChoke();
packet.setOptionalDelay(choke);
if (choke > 0)
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
packet.setResendDelay(con.getOptions().getResendDelay()); packet.setResendDelay(con.getOptions().getResendDelay());
if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)

View File

@ -10,6 +10,7 @@ import net.i2p.data.Destination;
import net.i2p.data.Signature; import net.i2p.data.Signature;
import net.i2p.data.SigningPrivateKey; import net.i2p.data.SigningPrivateKey;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/** /**
* Contain a single packet transferred as part of a streaming connection. * Contain a single packet transferred as part of a streaming connection.
@ -64,7 +65,6 @@ public class Packet {
private Destination _optionFrom; private Destination _optionFrom;
private int _optionDelay; private int _optionDelay;
private int _optionMaxSize; private int _optionMaxSize;
private ByteCache _cache;
/** /**
* The receiveStreamId will be set to this when the packet doesn't know * The receiveStreamId will be set to this when the packet doesn't know
@ -146,22 +146,28 @@ public class Packet {
public static final int DEFAULT_MAX_SIZE = 32*1024; public static final int DEFAULT_MAX_SIZE = 32*1024;
private static final int MAX_DELAY_REQUEST = 65535; private static final int MAX_DELAY_REQUEST = 65535;
public Packet() { public Packet() { }
_cache = ByteCache.getInstance(128, MAX_PAYLOAD_SIZE);
private boolean _sendStreamIdSet = false;
/** what stream do we send data to the peer on? */
public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) {
if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
_sendStreamIdSet = true;
_sendStreamId = id;
} }
/** what stream is this packet a part of? */ private boolean _receiveStreamIdSet = false;
public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) { _sendStreamId = id; }
/** /**
* Stream that replies should be sent on. if the * stream the replies should be sent on. this should be 0 if the
* connection is still being built, this should be * connection is still being built.
* null.
*
*/ */
public long getReceiveStreamId() { return _receiveStreamId; } public long getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(long id) { _receiveStreamId = id; } public void setReceiveStreamId(long id) {
if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
_receiveStreamIdSet = true;
_receiveStreamId = id;
}
/** 0-indexed sequence number for this Packet in the sendStream */ /** 0-indexed sequence number for this Packet in the sendStream */
public long getSequenceNum() { return _sequenceNum; } public long getSequenceNum() { return _sequenceNum; }
@ -208,8 +214,6 @@ public class Packet {
/** get the actual payload of the message. may be null */ /** get the actual payload of the message. may be null */
public ByteArray getPayload() { return _payload; } public ByteArray getPayload() { return _payload; }
public void setPayload(ByteArray payload) { public void setPayload(ByteArray payload) {
//if ( (_payload != null) && (_payload != payload) )
// _cache.release(_payload);
_payload = payload; _payload = payload;
if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) )
throw new IllegalArgumentException("Too large payload: " + payload.getValid()); throw new IllegalArgumentException("Too large payload: " + payload.getValid());
@ -218,15 +222,11 @@ public class Packet {
return (_payload == null ? 0 : _payload.getValid()); return (_payload == null ? 0 : _payload.getValid());
} }
public void releasePayload() { public void releasePayload() {
//if (_payload != null)
// _cache.release(_payload);
_payload = null; _payload = null;
} }
public ByteArray acquirePayload() { public ByteArray acquirePayload() {
ByteArray old = _payload; ByteArray old = _payload;
_payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]); //_cache.acquire(); _payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]);
//if (old != null)
// _cache.release(old);
return _payload; return _payload;
} }
@ -239,6 +239,7 @@ public class Packet {
else else
_flags &= ~flag; _flags &= ~flag;
} }
public void setFlags(int flags) { _flags = flags; }
/** the signature on the packet (only included if the flag for it is set) */ /** the signature on the packet (only included if the flag for it is set) */
public Signature getOptionalSignature() { return _optionSignature; } public Signature getOptionalSignature() { return _optionSignature; }
@ -262,7 +263,6 @@ public class Packet {
*/ */
public int getOptionalDelay() { return _optionDelay; } public int getOptionalDelay() { return _optionDelay; }
public void setOptionalDelay(int delayMs) { public void setOptionalDelay(int delayMs) {
setFlag(FLAG_DELAY_REQUESTED, delayMs > 0);
if (delayMs > MAX_DELAY_REQUEST) if (delayMs > MAX_DELAY_REQUEST)
_optionDelay = MAX_DELAY_REQUEST; _optionDelay = MAX_DELAY_REQUEST;
else if (delayMs < 0) else if (delayMs < 0)
@ -418,30 +418,31 @@ public class Packet {
if (length < 22) // min header size if (length < 22) // min header size
throw new IllegalArgumentException("Too small: len=" + buffer.length); throw new IllegalArgumentException("Too small: len=" + buffer.length);
int cur = offset; int cur = offset;
_sendStreamId = DataHelper.fromLong(buffer, cur, 4); setSendStreamId(DataHelper.fromLong(buffer, cur, 4));
cur += 4; cur += 4;
_receiveStreamId = DataHelper.fromLong(buffer, cur, 4); setReceiveStreamId(DataHelper.fromLong(buffer, cur, 4));
cur += 4; cur += 4;
_sequenceNum = DataHelper.fromLong(buffer, cur, 4); setSequenceNum(DataHelper.fromLong(buffer, cur, 4));
cur += 4; cur += 4;
_ackThrough = DataHelper.fromLong(buffer, cur, 4); setAckThrough(DataHelper.fromLong(buffer, cur, 4));
cur += 4; cur += 4;
int numNacks = (int)DataHelper.fromLong(buffer, cur, 1); int numNacks = (int)DataHelper.fromLong(buffer, cur, 1);
cur++; cur++;
if (length < 22 + numNacks*4) if (length < 22 + numNacks*4)
throw new IllegalArgumentException("Too small with " + numNacks + " nacks: " + length); throw new IllegalArgumentException("Too small with " + numNacks + " nacks: " + length);
if (numNacks > 0) { if (numNacks > 0) {
_nacks = new long[numNacks]; long nacks[] = new long[numNacks];
for (int i = 0; i < numNacks; i++) { for (int i = 0; i < numNacks; i++) {
_nacks[i] = DataHelper.fromLong(buffer, cur, 4); nacks[i] = DataHelper.fromLong(buffer, cur, 4);
cur += 4; cur += 4;
} }
setNacks(nacks);
} else { } else {
_nacks = null; setNacks(null);
} }
_resendDelay = (int)DataHelper.fromLong(buffer, cur, 1); setResendDelay((int)DataHelper.fromLong(buffer, cur, 1));
cur++; cur++;
_flags = (int)DataHelper.fromLong(buffer, cur, 2); setFlags((int)DataHelper.fromLong(buffer, cur, 2));
cur += 2; cur += 2;
int optionSize = (int)DataHelper.fromLong(buffer, cur, 2); int optionSize = (int)DataHelper.fromLong(buffer, cur, 2);
@ -457,33 +458,36 @@ public class Packet {
throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin);
// skip ahead to the payload // skip ahead to the payload
_payload = new ByteArray(new byte[payloadSize]); //_cache.acquire(); //_payload = new ByteArray(new byte[payloadSize]);
System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize); _payload = new ByteArray(buffer, payloadBegin, payloadSize);
_payload.setValid(payloadSize); //System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize);
_payload.setOffset(0); //_payload.setValid(payloadSize);
//_payload.setOffset(0);
// ok now lets go back and deal with the options // ok now lets go back and deal with the options
if (isFlagSet(FLAG_DELAY_REQUESTED)) { if (isFlagSet(FLAG_DELAY_REQUESTED)) {
_optionDelay = (int)DataHelper.fromLong(buffer, cur, 2); setOptionalDelay((int)DataHelper.fromLong(buffer, cur, 2));
cur += 2; cur += 2;
} }
if (isFlagSet(FLAG_FROM_INCLUDED)) { if (isFlagSet(FLAG_FROM_INCLUDED)) {
_optionFrom = new Destination(); Destination optionFrom = new Destination();
try { try {
cur += _optionFrom.readBytes(buffer, cur); cur += optionFrom.readBytes(buffer, cur);
setOptionalFrom(optionFrom);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
throw new IllegalArgumentException("Bad from field: " + dfe.getMessage()); throw new IllegalArgumentException("Bad from field: " + dfe.getMessage());
} }
} }
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) { if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) {
_optionMaxSize = (int)DataHelper.fromLong(buffer, cur, 2); setOptionalMaxSize((int)DataHelper.fromLong(buffer, cur, 2));
cur += 2; cur += 2;
} }
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) { if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) {
_optionSignature = new Signature(); Signature optionSignature = new Signature();
byte buf[] = new byte[Signature.SIGNATURE_BYTES]; byte buf[] = new byte[Signature.SIGNATURE_BYTES];
System.arraycopy(buffer, cur, buf, 0, Signature.SIGNATURE_BYTES); System.arraycopy(buffer, cur, buf, 0, Signature.SIGNATURE_BYTES);
_optionSignature.setData(buf); optionSignature.setData(buf);
setOptionalSignature(optionSignature);
cur += Signature.SIGNATURE_BYTES; cur += Signature.SIGNATURE_BYTES;
} }
} }
@ -509,7 +513,12 @@ public class Packet {
} }
boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey()); boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
if (!ok) { if (!ok) {
ctx.logManager().getLog(Packet.class).error("Signature failed on " + toString(), new Exception("moo")); Log l = ctx.logManager().getLog(Packet.class);
l.error("Signature failed on " + toString(), new Exception("moo"));
if (false) {
l.error(Base64.encode(buffer, 0, size));
l.error("Signature: " + Base64.encode(_optionSignature.getData()));
}
} }
return ok; return ok;
} }
@ -524,6 +533,12 @@ public class Packet {
setFlag(FLAG_SIGNATURE_INCLUDED); setFlag(FLAG_SIGNATURE_INCLUDED);
int size = writePacket(buffer, offset, false); int size = writePacket(buffer, offset, false);
_optionSignature = ctx.dsa().sign(buffer, offset, size, key); _optionSignature = ctx.dsa().sign(buffer, offset, size, key);
if (false) {
Log l = ctx.logManager().getLog(Packet.class);
l.error("Signing: " + toString());
l.error(Base64.encode(buffer, 0, size));
l.error("Signature: " + Base64.encode(_optionSignature.getData()));
}
// jump into the signed data and inject the signature where we // jump into the signed data and inject the signature where we
// previously placed a bunch of zeroes // previously placed a bunch of zeroes
int signatureOffset = offset int signatureOffset = offset

View File

@ -169,8 +169,8 @@ public class PacketHandler {
try { try {
con.getPacketHandler().receivePacket(packet, con); con.getPacketHandler().receivePacket(packet, con);
} catch (I2PException ie) { } catch (I2PException ie) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.ERROR))
_log.warn("Received forged packet for " + con + ": " + packet, ie); _log.error("Received forged packet for " + con + "/" + oldId + ": " + packet, ie);
con.setSendStreamId(oldId); con.setSendStreamId(oldId);
} }
} else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { } else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {

View File

@ -5,7 +5,6 @@ import java.util.Set;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
@ -27,7 +26,6 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
private long _ackOn; private long _ackOn;
private long _cancelledOn; private long _cancelledOn;
private SimpleTimer.TimedEvent _resendEvent; private SimpleTimer.TimedEvent _resendEvent;
private ByteCache _cache = ByteCache.getInstance(128, MAX_PAYLOAD_SIZE);
public PacketLocal(I2PAppContext ctx, Destination to) { public PacketLocal(I2PAppContext ctx, Destination to) {
this(ctx, to, null); this(ctx, to, null);
@ -71,8 +69,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public void prepare() { public void prepare() {
if (_connection != null) if (_connection != null)
_connection.getInputStream().updateAcks(this); _connection.getInputStream().updateAcks(this);
if (_numSends > 0) // so we can debug to differentiate resends if (_numSends > 0) {
// so we can debug to differentiate resends
setOptionalDelay(_numSends * 1000); setOptionalDelay(_numSends * 1000);
setFlag(FLAG_DELAY_REQUESTED);
}
} }
public long getCreatedOn() { return _createdOn; } public long getCreatedOn() { return _createdOn; }

View File

@ -1,4 +1,8 @@
$Id: history.txt,v 1.267 2005/09/27 02:17:41 jrandom Exp $ $Id: history.txt,v 1.268 2005/09/27 17:42:49 jrandom Exp $
2005-09-28 jrandom
* Fix for at least some (all?) of the wrong stream errors in the streaming
lib
2005-09-27 jrandom 2005-09-27 jrandom
* Properly suggest filenames for attachments in Syndie (thanks all!) * Properly suggest filenames for attachments in Syndie (thanks all!)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.247 $ $Date: 2005/09/27 02:17:40 $"; public final static String ID = "$Revision: 1.248 $ $Date: 2005/09/27 17:42:49 $";
public final static String VERSION = "0.6.0.6"; public final static String VERSION = "0.6.0.6";
public final static long BUILD = 7; public final static long BUILD = 8;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);