2005-09-30 jrandom

* Killed three more streaming lib bugs, one of which caused excess packets
      to be transmitted (dupacking dupacks), one that was the root of many of
      the old hung streams (shrinking highest received), and another that was
      releasing data too soon.
This commit is contained in:
jrandom
2005-09-30 23:12:57 +00:00
committed by zzz
parent 934a269753
commit 9dfa87ba47
6 changed files with 39 additions and 19 deletions

View File

@ -311,16 +311,20 @@ public class Connection {
}
List ackPackets(long ackThrough, long nacks[]) {
if (nacks == null) {
_highestAckedThrough = ackThrough;
if (ackThrough < _highestAckedThrough) {
// dupack which won't tell us anything
} else {
long lowest = -1;
for (int i = 0; i < nacks.length; i++) {
if ( (lowest < 0) || (nacks[i] < lowest) )
lowest = nacks[i];
if (nacks == null) {
_highestAckedThrough = ackThrough;
} else {
long lowest = -1;
for (int i = 0; i < nacks.length; i++) {
if ( (lowest < 0) || (nacks[i] < lowest) )
lowest = nacks[i];
}
if (lowest - 1 > _highestAckedThrough)
_highestAckedThrough = lowest - 1;
}
if (lowest - 1 > _highestAckedThrough)
_highestAckedThrough = lowest - 1;
}
List acked = null;

View File

@ -100,10 +100,12 @@ public class ConnectionPacketHandler {
(packet.getReceiveStreamId() <= 0) ) )
allowAck = false;
if (allowAck)
if (allowAck) {
isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
else
isNew = con.getInputStream().messageReceived(con.getInputStream().getHighestReadyBockId(), null);
} else {
con.getInputStream().notifyActivity();
isNew = false;
}
if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
@ -166,10 +168,16 @@ public class ConnectionPacketHandler {
}
con.eventOccurred();
if (fastAck) {
if (con.getLastSendTime() + 2000 < _context.clock().now()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast ack for dup " + packet);
con.ackImmediately();
if (!isNew) {
// if we're congested (fastAck) but this is also a new packet,
// we've already scheduled an ack above, so there is no need to schedule
// a fast ack (we can wait a few ms)
} else {
if (con.getLastSendTime() + 2000 < _context.clock().now()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast ack for dup " + packet);
con.ackImmediately();
}
}
}

View File

@ -193,6 +193,8 @@ public class MessageInputStream extends InputStream {
}
}
public void notifyActivity() { synchronized (_dataLock) { _dataLock.notifyAll(); } }
/**
* A new message has arrived - toss it on the appropriate queue (moving
* previously pending messages to the ready queue if it fills the gap, etc).

View File

@ -222,7 +222,7 @@ public class Packet {
return (_payload == null ? 0 : _payload.getValid());
}
public void releasePayload() {
_payload = null;
//_payload = null;
}
public ByteArray acquirePayload() {
ByteArray old = _payload;