* UDP:
- Fix bug causing PacketPusher to loop quickly instead of sleeping - Fix udp.sendCycleTime and sendCycleTimeSlow stats - Fix speed values on peers.jsp - Try to fix rare NPE (ticket 298)
This commit is contained in:
@ -19,6 +19,7 @@ import net.i2p.util.Log;
|
|||||||
* initiated the connection with us. In other words, they are Alice and
|
* initiated the connection with us. In other words, they are Alice and
|
||||||
* we are Bob.
|
* we are Bob.
|
||||||
*
|
*
|
||||||
|
* TODO do all these methods need to be synchronized?
|
||||||
*/
|
*/
|
||||||
class InboundEstablishState {
|
class InboundEstablishState {
|
||||||
private final RouterContext _context;
|
private final RouterContext _context;
|
||||||
@ -27,7 +28,8 @@ class InboundEstablishState {
|
|||||||
private byte _receivedX[];
|
private byte _receivedX[];
|
||||||
private byte _bobIP[];
|
private byte _bobIP[];
|
||||||
private final int _bobPort;
|
private final int _bobPort;
|
||||||
private DHSessionKeyBuilder _keyBuilder;
|
// try to fix NPE in getSentY() ?????
|
||||||
|
private volatile DHSessionKeyBuilder _keyBuilder;
|
||||||
// SessionCreated message
|
// SessionCreated message
|
||||||
private byte _sentY[];
|
private byte _sentY[];
|
||||||
private final byte _aliceIP[];
|
private final byte _aliceIP[];
|
||||||
|
@ -33,6 +33,8 @@ class OutboundMessageFragments {
|
|||||||
/** which peer should we build the next packet out of? */
|
/** which peer should we build the next packet out of? */
|
||||||
private int _nextPeer;
|
private int _nextPeer;
|
||||||
private PacketBuilder _builder;
|
private PacketBuilder _builder;
|
||||||
|
private long _lastCycleTime = System.currentTimeMillis();
|
||||||
|
|
||||||
/** if we can handle more messages explicitly, set this to true */
|
/** if we can handle more messages explicitly, set this to true */
|
||||||
// private boolean _allowExcess; // LINT not used??
|
// private boolean _allowExcess; // LINT not used??
|
||||||
// private volatile long _packetsRetransmitted; // LINT not used??
|
// private volatile long _packetsRetransmitted; // LINT not used??
|
||||||
@ -176,13 +178,13 @@ class OutboundMessageFragments {
|
|||||||
if (!_activePeers.contains(peer)) {
|
if (!_activePeers.contains(peer)) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
|
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
|
||||||
|
if (_activePeers.isEmpty())
|
||||||
|
_lastCycleTime = System.currentTimeMillis();
|
||||||
_activePeers.add(peer);
|
_activePeers.add(peer);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
|
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
|
||||||
}
|
}
|
||||||
if (_activePeers.size() == 1)
|
|
||||||
_lastCycleTime = System.currentTimeMillis();
|
|
||||||
_activePeers.notifyAll();
|
_activePeers.notifyAll();
|
||||||
}
|
}
|
||||||
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||||
@ -227,8 +229,6 @@ class OutboundMessageFragments {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long _lastCycleTime = System.currentTimeMillis();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch all the packets for a message volley, blocking until there is a
|
* Fetch all the packets for a message volley, blocking until there is a
|
||||||
* message which can be fully transmitted (or the transport is shut down).
|
* message which can be fully transmitted (or the transport is shut down).
|
||||||
@ -248,18 +248,24 @@ class OutboundMessageFragments {
|
|||||||
for (int i = 0; i < _activePeers.size(); i++) {
|
for (int i = 0; i < _activePeers.size(); i++) {
|
||||||
int cur = (i + _nextPeer) % _activePeers.size();
|
int cur = (i + _nextPeer) % _activePeers.size();
|
||||||
if (cur == 0) {
|
if (cur == 0) {
|
||||||
|
// FIXME or delete, these stats aren't much help since they include the sleep time
|
||||||
long ts = System.currentTimeMillis();
|
long ts = System.currentTimeMillis();
|
||||||
long cycleTime = ts - _lastCycleTime;
|
long cycleTime = ts - _lastCycleTime;
|
||||||
|
_lastCycleTime = ts;
|
||||||
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size());
|
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size());
|
||||||
if (cycleTime > 1000)
|
// make longer than the default sleep time below
|
||||||
|
if (cycleTime > 1100)
|
||||||
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
|
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
|
||||||
}
|
}
|
||||||
peer = _activePeers.get(i);
|
peer = _activePeers.get(i);
|
||||||
state = peer.allocateSend();
|
state = peer.allocateSend();
|
||||||
if (state != null) {
|
if (state != null) {
|
||||||
|
// we have something to send and we will be returning it
|
||||||
_nextPeer = i + 1;
|
_nextPeer = i + 1;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
// Update the minimum delay for all peers (getNextDelay() returns 1 for "now")
|
||||||
|
// which will be used if we found nothing to send across all peers
|
||||||
int delay = peer.getNextDelay();
|
int delay = peer.getNextDelay();
|
||||||
if ( (nextSendDelay <= 0) || (delay < nextSendDelay) )
|
if ( (nextSendDelay <= 0) || (delay < nextSendDelay) )
|
||||||
nextSendDelay = delay;
|
nextSendDelay = delay;
|
||||||
@ -274,8 +280,9 @@ class OutboundMessageFragments {
|
|||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("wait for " + nextSendDelay);
|
_log.debug("wait for " + nextSendDelay);
|
||||||
// wait.. or somethin'
|
// wait.. or somethin'
|
||||||
|
// wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says
|
||||||
if (nextSendDelay > 0)
|
if (nextSendDelay > 0)
|
||||||
_activePeers.wait(nextSendDelay);
|
_activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000));
|
||||||
else
|
else
|
||||||
_activePeers.wait(1000);
|
_activePeers.wait(1000);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1270,7 +1270,7 @@ class PeerState {
|
|||||||
if (_dead) return -1;
|
if (_dead) return -1;
|
||||||
synchronized (msgs) {
|
synchronized (msgs) {
|
||||||
if (_retransmitter != null) {
|
if (_retransmitter != null) {
|
||||||
rv = (int)(now - _retransmitter.getNextSendTime());
|
rv = (int)(_retransmitter.getNextSendTime() - now);
|
||||||
if (rv <= 0)
|
if (rv <= 0)
|
||||||
return 1;
|
return 1;
|
||||||
else
|
else
|
||||||
|
@ -1986,8 +1986,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
|||||||
buf.append(DataHelper.formatDuration2(idleOut));
|
buf.append(DataHelper.formatDuration2(idleOut));
|
||||||
buf.append("</td>");
|
buf.append("</td>");
|
||||||
|
|
||||||
int recvBps = (idleIn > 2 ? 0 : peer.getReceiveBps());
|
int recvBps = (idleIn > 15*1000 ? 0 : peer.getReceiveBps());
|
||||||
int sendBps = (idleOut > 2 ? 0 : peer.getSendBps());
|
int sendBps = (idleOut > 15*1000 ? 0 : peer.getSendBps());
|
||||||
|
|
||||||
buf.append("<td class=\"cells\" align=\"right\" nowrap>");
|
buf.append("<td class=\"cells\" align=\"right\" nowrap>");
|
||||||
buf.append(formatKBps(recvBps));
|
buf.append(formatKBps(recvBps));
|
||||||
|
Reference in New Issue
Block a user