SSU: Remove ActiveThrottle code, disabled 19 years ago

This commit is contained in:
zzz
2025-05-02 08:57:47 -04:00
parent abbda43e84
commit 97332e7472
6 changed files with 5 additions and 462 deletions

View File

@ -1,31 +0,0 @@
package net.i2p.router.transport.udp;
import net.i2p.data.Hash;
/**
* Since the TimedWeightedPriorityMessageQueue.add()
* was disabled by jrandom in UDPTransport.java
* on 2006-02-19, and the choke/unchoke was disabled at the same time,
* all of TWPMQ is pointless, so just do this for now.
*
* It appears from his comments that it was a lock contention issue,
* so perhaps TWPMQ can be converted to concurrent and re-enabled.
*
* @since 0.7.12
*/
class DummyThrottle implements OutboundMessageFragments.ActiveThrottle {
public DummyThrottle() {
}
public void choke(Hash peer) {
}
public void unchoke(Hash peer) {
}
public boolean isChoked(Hash peer) {
return false;
}
}

View File

@ -1,20 +0,0 @@
package net.i2p.router.transport.udp;
import net.i2p.router.OutNetMessage;
/**
* Base queue for messages not yet packetized
*/
interface MessageQueue {
/**
* Get the next message, blocking until one is found or the expiration
* reached.
*
* @param blockUntil expiration, or -1 if indefinite
*/
public OutNetMessage getNext(long blockUntil);
/**
* Add on a new message to the queue
*/
public void add(OutNetMessage message);
}

View File

@ -35,7 +35,6 @@ class OutboundMessageFragments {
private final RouterContext _context;
private final Log _log;
private final UDPTransport _transport;
// private ActiveThrottle _throttle; // LINT not used ??
/**
* Peers we are actively sending messages to.
@ -64,11 +63,10 @@ class OutboundMessageFragments {
static final int MAX_VOLLEYS = 10;
private static final int MAX_WAIT = 1000;
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(OutboundMessageFragments.class);
_transport = transport;
// _throttle = throttle;
_activePeers = new ConcurrentHashSet<PeerState>(256);
_builder2 = transport.getBuilder2();
_alive = true;
@ -108,42 +106,6 @@ class OutboundMessageFragments {
_activePeers.remove(peer);
}
/**
* Block until we allow more messages to be admitted to the active
* pool. This is called by the {@link OutboundRefiller}
*
* @return true if more messages are allowed
*/
public boolean waitForMoreAllowed() {
// test without choking.
// perhaps this should check the lifetime of the first activeMessage?
if (true) return true;
/*
long start = _context.clock().now();
int numActive = 0;
int maxActive = Math.max(_transport.countActivePeers(), MAX_ACTIVE);
while (_alive) {
finishMessages();
try {
synchronized (_activeMessages) {
numActive = _activeMessages.size();
if (!_alive)
return false;
else if (numActive < maxActive)
return true;
else if (_allowExcess)
return true;
else
_activeMessages.wait(1000);
}
_context.statManager().addRateData("udp.activeDelay", numActive, _context.clock().now() - start);
} catch (InterruptedException ie) {}
}
*/
return false;
}
/**
* Add a new message to the active pool
*
@ -512,10 +474,4 @@ class OutboundMessageFragments {
}
****/
/** throttle */
public interface ActiveThrottle {
public void choke(Hash peer);
public void unchoke(Hash peer);
public boolean isChoked(Hash peer);
}
}

View File

@ -1,63 +0,0 @@
package net.i2p.router.transport.udp;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Blocking thread to grab new messages off the outbound queue and
* plopping them into our active pool.
*
* WARNING - UNUSED since 0.6.1.11
*
*/
class OutboundRefiller implements Runnable {
private RouterContext _context;
private Log _log;
private OutboundMessageFragments _fragments;
private MessageQueue _messages;
private boolean _alive;
// private Object _refillLock;
public OutboundRefiller(RouterContext ctx, OutboundMessageFragments fragments, MessageQueue messages) {
_context = ctx;
_log = ctx.logManager().getLog(OutboundRefiller.class);
_fragments = fragments;
_messages = messages;
// _refillLock = this;
_context.statManager().createRateStat("udp.timeToActive", "Message lifetime until it reaches the outbound fragment queue", "udp", UDPTransport.RATES);
}
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP outbound refiller", true);
t.start();
}
public void shutdown() { _alive = false; }
public void run() {
while (_alive) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Check the fragments to see if we can add more...");
boolean wantMore = _fragments.waitForMoreAllowed();
if (wantMore) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Want more fragments...");
OutNetMessage msg = _messages.getNext(-1);
if (msg != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("New message found to fragments: " + msg);
_context.statManager().addRateData("udp.timeToActive", msg.getLifetime(), msg.getLifetime());
_fragments.add(msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No message found to fragment");
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("No more fragments allowed, looping");
}
}
}
}

View File

@ -1,266 +0,0 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import net.i2p.data.Hash;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Weighted priority queue implementation for the outbound messages, coupled
* with code to fail messages that expire.
*
* WARNING - UNUSED since 0.6.1.11
* See comments in DummyThrottle.java and mtn history ca. 2006-02-19
*
*/
class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle {
private RouterContext _context;
private Log _log;
/** FIFO queue of messages in a particular priority */
private List<OutNetMessage> _queue[];
/** all messages in the indexed queue are at or below the given priority. */
private int _priorityLimits[];
/** weighting for each queue */
private int _weighting[];
/** how many bytes are enqueued */
private long _bytesQueued[];
/** how many messages have been pushed out in this pass */
private int _messagesFlushed[];
/** how many bytes total have been pulled off the given queue */
private long _bytesTransferred[];
/** lock to notify message enqueue/removal (and block for getNext()) */
private final Object _nextLock;
/** have we shut down or are we still alive? */
private boolean _alive;
/** which queue should we pull out of next */
private int _nextQueue;
/** true if a message is enqueued while the getNext() call is in progress */
private volatile boolean _addedSincePassBegan;
private Expirer _expirer;
private FailedListener _listener;
/** set of peers (Hash) whose congestion window is exceeded in the active queue */
private Set<Hash> _chokedPeers;
/**
* Build up a new queue
*
* @param priorityLimits ordered breakpoint for the different message
* priorities, with the lowest limit first.
* @param weighting how much to prefer a given priority grouping.
* specifically, this means how many messages in this queue
* should be pulled off in a row before moving on to the next.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public TimedWeightedPriorityMessageQueue(RouterContext ctx, int[] priorityLimits, int[] weighting, FailedListener lsnr) {
_context = ctx;
_log = ctx.logManager().getLog(TimedWeightedPriorityMessageQueue.class);
_queue = new List[weighting.length];
_priorityLimits = new int[weighting.length];
_weighting = new int[weighting.length];
_bytesQueued = new long[weighting.length];
_bytesTransferred = new long[weighting.length];
_messagesFlushed = new int[weighting.length];
for (int i = 0; i < weighting.length; i++) {
_queue[i] = new ArrayList<OutNetMessage>(8);
_weighting[i] = weighting[i];
_priorityLimits[i] = priorityLimits[i];
_messagesFlushed[i] = 0;
_bytesQueued[i] = 0;
_bytesTransferred[i] = 0;
}
_alive = true;
_nextLock = this;
_chokedPeers = Collections.synchronizedSet(new HashSet<Hash>(16));
_listener = lsnr;
_context.statManager().createRateStat("udp.timeToEntrance", "Message lifetime until it reaches the UDP system", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.messageQueueSize", "How many messages are on the current class queue at removal", "udp", UDPTransport.RATES);
_expirer = new Expirer();
I2PThread t = new I2PThread(_expirer, "UDP outbound expirer");
t.setDaemon(true);
t.start();
}
public void add(OutNetMessage message) {
if (message == null) return;
_context.statManager().addRateData("udp.timeToEntrance", message.getLifetime(), message.getLifetime());
int queue = pickQueue(message);
long size = message.getMessageSize();
synchronized (_queue[queue]) {
_queue[queue].add(message);
_bytesQueued[queue] += size;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added a " + size + " byte message to queue " + queue);
synchronized (_nextLock) {
_addedSincePassBegan = true;
_nextLock.notifyAll();
}
message.timestamp("added to queue " + queue);
}
/**
* Grab the next message out of the next queue. This only advances
* the _nextQueue var after pushing _weighting[currentQueue] messages
* or the queue is empty. This call blocks until either a message
* becomes available or the queue is shut down.
*
* @param blockUntil expiration, or -1 if indefinite
* @return message dequeued, or null if the queue was shut down
*/
public OutNetMessage getNext(long blockUntil) {
while (_alive) {
_addedSincePassBegan = false;
for (int i = 0; i < _queue.length; i++) {
int currentQueue = (_nextQueue + i) % _queue.length;
synchronized (_queue[currentQueue]) {
for (int j = 0; j < _queue[currentQueue].size(); j++) {
OutNetMessage msg = _queue[currentQueue].get(j);
Hash to = msg.getTarget().getIdentity().getHash();
if (_chokedPeers.contains(to))
continue;
// not choked, lets push it to active
_queue[currentQueue].remove(j);
long size = msg.getMessageSize();
_bytesQueued[currentQueue] -= size;
_bytesTransferred[currentQueue] += size;
_messagesFlushed[currentQueue]++;
if (_messagesFlushed[currentQueue] >= _weighting[currentQueue]) {
_messagesFlushed[currentQueue] = 0;
_nextQueue = (currentQueue + 1) % _queue.length;
}
int sz = _queue[currentQueue].size();
_context.statManager().addRateData("udp.messageQueueSize", sz, currentQueue);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pulling a message off queue " + currentQueue + " with "
+ sz + " remaining");
msg.timestamp("made active with remaining queue size " + sz);
return msg;
}
// nothing waiting, or only choked peers
_messagesFlushed[currentQueue] = 0;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing available on queue " + currentQueue);
}
}
long remaining = blockUntil - _context.clock().now();
if ( (blockUntil > 0) && (remaining < 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nonblocking, or block time has expired");
return null;
}
try {
synchronized (_nextLock) {
if (!_addedSincePassBegan && _alive) {
// nothing added since we begun iterating through,
// so we can safely wait for the full period. otoh,
// even if this is true, we might be able to safely
// wait, but it doesn't hurt to loop again.
if (_log.shouldLog(Log.DEBUG))
_log.debug("Wait for activity (up to " + remaining + "ms)");
if (blockUntil < 0)
_nextLock.wait();
else
_nextLock.wait(remaining);
}
}
} catch (InterruptedException ie) {}
}
return null;
}
public void shutdown() {
_alive = false;
synchronized (_nextLock) {
_nextLock.notifyAll();
}
}
public void choke(Hash peer) {
if (true) return;
_chokedPeers.add(peer);
synchronized (_nextLock) {
_nextLock.notifyAll();
}
}
public void unchoke(Hash peer) {
if (true) return;
_chokedPeers.remove(peer);
synchronized (_nextLock) {
_nextLock.notifyAll();
}
}
public boolean isChoked(Hash peer) {
return _chokedPeers.contains(peer);
}
private int pickQueue(OutNetMessage message) {
int target = message.getPriority();
for (int i = 0; i < _priorityLimits.length; i++) {
if (_priorityLimits[i] <= target) {
if (i == 0)
return 0;
else
return i - 1;
}
}
return _priorityLimits.length-1;
}
public interface FailedListener {
public void failed(OutNetMessage msg, String reason);
}
/**
* Drop expired messages off the queues
*/
private class Expirer implements Runnable {
public void run() {
List<OutNetMessage> removed = new ArrayList<OutNetMessage>(1);
while (_alive) {
long now = _context.clock().now();
for (int i = 0; i < _queue.length; i++) {
synchronized (_queue[i]) {
for (int j = 0; j < _queue[i].size(); j++) {
OutNetMessage m = _queue[i].get(j);
if (m.getExpiration() < now) {
_bytesQueued[i] -= m.getMessageSize();
removed.add(m);
_queue[i].remove(j);
j--;
continue;
}
}
}
}
for (int i = 0; i < removed.size(); i++) {
OutNetMessage m = removed.get(i);
m.timestamp("expirer killed it");
_listener.failed(m, "expired before getting on the active pool");
}
removed.clear();
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
}
}
}
}

View File

@ -68,7 +68,7 @@ import net.i2p.util.VersionComparator;
/**
* The SSU transport
*/
public class UDPTransport extends TransportImpl implements TimedWeightedPriorityMessageQueue.FailedListener {
public class UDPTransport extends TransportImpl {
private final Log _log;
private final List<UDPEndpoint> _endpoints;
private final Object _addDropLock = new Object();
@ -80,10 +80,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private final Map<Long, PeerStateDestroyed> _recentlyClosedConnIDs;
private PacketHandler _handler;
private EstablishmentManager _establisher;
private final MessageQueue _outboundMessages;
private final OutboundMessageFragments _fragments;
private final OutboundMessageFragments.ActiveThrottle _activeThrottle;
private OutboundRefiller _refiller;
private volatile PacketPusher _pusher;
private final InboundMessageFragments _inboundFragments;
//private UDPFlooder _flooder;
@ -225,8 +222,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** how many relays offered to us will we use at a time? */
public static final int PUBLIC_RELAY_COUNT = 3;
private static final boolean USE_PRIORITY = false;
/** configure the priority queue with the given split points */
private static final int PRIORITY_LIMITS[] = new int[] { 100, 200, 300, 400, 500, 1000 };
/** configure the priority queue with the given weighting per priority group */
@ -378,24 +373,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_dropList = new ConcurrentHashSet<RemoteHostId>(2);
_endpoints = new CopyOnWriteArrayList<UDPEndpoint>();
// See comments in DummyThrottle.java
if (USE_PRIORITY) {
TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
_outboundMessages = mq;
_activeThrottle = mq;
} else {
DummyThrottle mq = new DummyThrottle();
_outboundMessages = null;
_activeThrottle = mq;
}
_cachedBid = new SharedBid[BID_VALUES.length];
for (int i = 0; i < BID_VALUES.length; i++) {
_cachedBid[i] = new SharedBid(BID_VALUES[i]);
}
_packetBuilder2 = new PacketBuilder2(_context, this);
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_fragments = new OutboundMessageFragments(_context, this);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
//if (SHOULD_FLOOD_PEERS)
// _flooder = new UDPFlooder(_context, this);
@ -546,8 +530,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
if (_establisher != null)
_establisher.shutdown();
if (_refiller != null)
_refiller.shutdown();
_inboundFragments.shutdown();
//if (_flooder != null)
// _flooder.shutdown();
@ -693,10 +675,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_handler = new PacketHandler(_context, this, _establisher,
_inboundFragments, _testManager, _introManager);
// See comments in DummyThrottle.java
if (USE_PRIORITY && _refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
//if (SHOULD_FLOOD_PEERS && _flooder == null)
// _flooder = new UDPFlooder(_context, this);
@ -738,8 +716,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_inboundFragments.startup();
_pusher = new PacketPusher(_context, _fragments, _endpoints);
_pusher.startup();
if (USE_PRIORITY)
_refiller.startup();
//if (SHOULD_FLOOD_PEERS)
// _flooder.startup();
_expireEvent.setIsAlive(true);
@ -866,8 +842,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
//if (_flooder != null)
// _flooder.shutdown();
if (_refiller != null)
_refiller.shutdown();
if (_handler != null)
_handler.shutdown();
if (_pusher != null)
@ -1942,7 +1916,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
+ " byHostsz = " + _peersByRemoteHost.size());
}
_activeThrottle.unchoke(peer.getRemotePeer());
markReachable(peer.getRemotePeer(), peer.isInbound());
//if (SHOULD_FLOOD_PEERS)
@ -2190,9 +2163,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
+ " byIDsz = " + _peersByIdent.size()
+ " byHostsz = " + _peersByRemoteHost.size());
// unchoke 'em, but just because we'll never talk again...
_activeThrottle.unchoke(peer.getRemotePeer());
//if (SHOULD_FLOOD_PEERS)
// _flooder.removePeer(peer);
@ -2641,11 +2611,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Add to fragments for " + to);
// See comments in DummyThrottle.java
if (USE_PRIORITY)
_outboundMessages.add(msg);
else // skip the priority queue and go straight to the active pool
_fragments.add(msg);
// skip the priority queue and go straight to the active pool
_fragments.add(msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Establish new connection to " + to);