* UDP Transport:

- Replace the unused-since-2006 TimedWeightedPriorityMessageQueue
        with DummyThrottle
      - Don't instantiate and start TWPMQ Cleaner and OutboundRefiller
        threads, part of priority queues unused since 0.6.1.11
      - Don't instantiate and start UDPFlooder, it is for testing only
This commit is contained in:
zzz
2010-02-26 16:54:41 +00:00
parent e3353df8bb
commit 7b70210c9a
5 changed files with 66 additions and 13 deletions

View File

@ -0,0 +1,32 @@
package net.i2p.router.transport.udp;
import net.i2p.data.Hash;
import net.i2p.router.OutNetMessage;
/**
* Since the TimedWeightedPriorityMessageQueue.add()
* was disabled by jrandom in UDPTransport.java
* on 2006-02-19, and the choke/unchoke was disabled at the same time,
* all of TWPMQ is pointless, so just do this for now.
*
* It appears from his comments that it was a lock contention issue,
* so perhaps TWPMQ can be converted to concurrent and re-enabled.
*
* @since 0.7.12
*/
public class DummyThrottle implements OutboundMessageFragments.ActiveThrottle {
public DummyThrottle() {
}
public void choke(Hash peer) {
}
public void unchoke(Hash peer) {
}
public boolean isChoked(Hash peer) {
return false;
}
}

View File

@ -9,6 +9,8 @@ import net.i2p.util.Log;
* Blocking thread to grab new messages off the outbound queue and * Blocking thread to grab new messages off the outbound queue and
* plopping them into our active pool. * plopping them into our active pool.
* *
* WARNING - UNUSED since 0.6.1.11
*
*/ */
public class OutboundRefiller implements Runnable { public class OutboundRefiller implements Runnable {
private RouterContext _context; private RouterContext _context;

View File

@ -16,6 +16,9 @@ import net.i2p.util.Log;
* Weighted priority queue implementation for the outbound messages, coupled * Weighted priority queue implementation for the outbound messages, coupled
* with code to fail messages that expire. * with code to fail messages that expire.
* *
* WARNING - UNUSED since 0.6.1.11
* See comments in DQAT.java and mtn history ca. 2006-02-19
*
*/ */
public class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle { public class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle {
private RouterContext _context; private RouterContext _context;

View File

@ -12,7 +12,8 @@ import net.i2p.util.I2PThread;
// import net.i2p.util.Log; // import net.i2p.util.Log;
/** /**
* * This sends random data to all UDP peers at a specified rate.
* It is for testing only!
*/ */
class UDPFlooder implements Runnable { class UDPFlooder implements Runnable {
private RouterContext _context; private RouterContext _context;

View File

@ -135,12 +135,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** how many relays offered to us will we use at a time? */ /** how many relays offered to us will we use at a time? */
public static final int PUBLIC_RELAY_COUNT = 3; public static final int PUBLIC_RELAY_COUNT = 3;
private static final boolean USE_PRIORITY = false;
/** configure the priority queue with the given split points */ /** configure the priority queue with the given split points */
private static final int PRIORITY_LIMITS[] = new int[] { 100, 200, 300, 400, 500, 1000 }; private static final int PRIORITY_LIMITS[] = new int[] { 100, 200, 300, 400, 500, 1000 };
/** configure the priority queue with the given weighting per priority group */ /** configure the priority queue with the given weighting per priority group */
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */ /** should we flood all UDP peers with the configured rate? This is for testing only! */
private static final boolean SHOULD_FLOOD_PEERS = false; private static final boolean SHOULD_FLOOD_PEERS = false;
private static final int MAX_CONSECUTIVE_FAILED = 5; private static final int MAX_CONSECUTIVE_FAILED = 5;
@ -170,9 +172,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_dropList = new ArrayList(256); _dropList = new ArrayList(256);
_endpoint = null; _endpoint = null;
TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); // See comments in DQAT.java
_outboundMessages = mq; if (USE_PRIORITY) {
_activeThrottle = mq; TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
_outboundMessages = mq;
_activeThrottle = mq;
} else {
DummyThrottle mq = new DummyThrottle();
_outboundMessages = null;
_activeThrottle = mq;
}
_cachedBid = new SharedBid[BID_VALUES.length]; _cachedBid = new SharedBid[BID_VALUES.length];
for (int i = 0; i < BID_VALUES.length; i++) { for (int i = 0; i < BID_VALUES.length; i++) {
@ -181,7 +190,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this); if (SHOULD_FLOOD_PEERS)
_flooder = new UDPFlooder(_context, this);
_expireTimeout = EXPIRE_TIMEOUT; _expireTimeout = EXPIRE_TIMEOUT;
_expireEvent = new ExpirePeerEvent(); _expireEvent = new ExpirePeerEvent();
_testEvent = new PeerTestEvent(); _testEvent = new PeerTestEvent();
@ -276,10 +286,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_handler == null) if (_handler == null)
_handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager); _handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager);
if (_refiller == null) // See comments in DQAT.java
if (USE_PRIORITY && _refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages); _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
if (_flooder == null) if (SHOULD_FLOOD_PEERS && _flooder == null)
_flooder = new UDPFlooder(_context, this); _flooder = new UDPFlooder(_context, this);
// Startup the endpoint with the requested port, check the actual port, and // Startup the endpoint with the requested port, check the actual port, and
@ -306,8 +317,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_inboundFragments.startup(); _inboundFragments.startup();
_pusher = new PacketPusher(_context, _fragments, _endpoint.getSender()); _pusher = new PacketPusher(_context, _fragments, _endpoint.getSender());
_pusher.startup(); _pusher.startup();
_refiller.startup(); if (USE_PRIORITY)
_flooder.startup(); _refiller.startup();
if (SHOULD_FLOOD_PEERS)
_flooder.startup();
_expireEvent.setIsAlive(true); _expireEvent.setIsAlive(true);
_testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future... _testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future...
SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon
@ -1150,10 +1163,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
msg.timestamp("enqueueing for an already established peer"); msg.timestamp("enqueueing for an already established peer");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Add to fragments for " + to.toBase64()); _log.debug("Add to fragments for " + to.toBase64());
if (true) // skip the priority queue and go straight to the active pool
_fragments.add(msg); // See comments in DQAT.java
else if (USE_PRIORITY)
_outboundMessages.add(msg); _outboundMessages.add(msg);
else // skip the priority queue and go straight to the active pool
_fragments.add(msg);
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Establish new connection to " + to.toBase64()); _log.debug("Establish new connection to " + to.toBase64());