From b4c495531a30fddc8a06e8931bcc2beaa6af40f3 Mon Sep 17 00:00:00 2001 From: jrandom Date: Tue, 21 Feb 2006 13:31:18 +0000 Subject: [PATCH] 2006-02-21 jrandom * Throttle the outbound SSU establishment queue, so it doesn't fill up the heap when backlogged (and so that the messages queued up on it don't sit there forever) * Further SSU memory cleanup * Clean up the address regeneration code so it knows when to rebuild the local info more precisely. --- .../client/streaming/MessageOutputStream.java | 2 - .../i2p/client/streaming/PacketHandler.java | 3 +- core/java/src/net/i2p/crypto/AESEngine.java | 1 - .../src/net/i2p/crypto/CryptixAESEngine.java | 1 - core/java/src/net/i2p/util/SimpleTimer.java | 4 +- history.txt | 6 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../src/net/i2p/router/StatisticsManager.java | 1 + .../udp/InboundMessageFragments.java | 1 + .../transport/udp/IntroductionManager.java | 46 +++- .../udp/OutboundMessageFragments.java | 5 + .../router/transport/udp/PacketHandler.java | 10 +- .../i2p/router/transport/udp/PeerState.java | 44 +++- .../router/transport/udp/UDPTransport.java | 235 +++++++++++------- 14 files changed, 242 insertions(+), 121 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 16b0a0734..34c826ab2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -372,8 +372,6 @@ public class MessageOutputStream extends OutputStream { /** * called whenever the engine wants to push more data to the * peer - * - * @return true if the data was flushed */ void flushAvailable(DataReceiver target) throws IOException { flushAvailable(target, true); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 668145c0b..c7c0f0836 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -102,7 +102,8 @@ public class PacketHandler { Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { receiveKnownCon(con, packet); - displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO()); + if (_log.shouldLog(Log.INFO)) + displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO()); } else { receiveUnknownCon(packet, sendId); displayPacket(packet, "UNKN", null); diff --git a/core/java/src/net/i2p/crypto/AESEngine.java b/core/java/src/net/i2p/crypto/AESEngine.java index 1d79fe80c..a67281b23 100644 --- a/core/java/src/net/i2p/crypto/AESEngine.java +++ b/core/java/src/net/i2p/crypto/AESEngine.java @@ -153,7 +153,6 @@ public class AESEngine { /** decrypt the data with the session key provided * @param payload encrypted data * @param sessionKey private session key - * @return unencrypted data */ public void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) { System.arraycopy(payload, inIndex, rv, outIndex, rv.length - outIndex); diff --git a/core/java/src/net/i2p/crypto/CryptixAESEngine.java b/core/java/src/net/i2p/crypto/CryptixAESEngine.java index a334cb2e2..626869c31 100644 --- a/core/java/src/net/i2p/crypto/CryptixAESEngine.java +++ b/core/java/src/net/i2p/crypto/CryptixAESEngine.java @@ -139,7 +139,6 @@ public class CryptixAESEngine extends AESEngine { /** decrypt the data with the session key provided * @param payload encrypted data * @param sessionKey private session key - * @return unencrypted data */ public final void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) { if ( (payload == null) || (rv == null) ) diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index a5db2561d..9cde50032 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -58,8 +58,8 @@ public class SimpleTimer { */ public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } /** - * @param useEarliestEventTime if its already scheduled, use the earlier of the - * two timeouts, else use the later + * @param useEarliestTime if its already scheduled, use the earlier of the + * two timeouts, else use the later */ public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { int totalEvents = 0; diff --git a/history.txt b/history.txt index 22a2023a2..09114c0a3 100644 --- a/history.txt +++ b/history.txt @@ -1,10 +1,12 @@ -$Id: history.txt,v 1.411 2006/02/20 13:12:47 jrandom Exp $ +$Id: history.txt,v 1.412 2006/02/20 21:02:49 jrandom Exp $ -2006-02-20 jrandom +2006-02-21 jrandom * Throttle the outbound SSU establishment queue, so it doesn't fill up the heap when backlogged (and so that the messages queued up on it don't sit there forever) * Further SSU memory cleanup + * Clean up the address regeneration code so it knows when to rebuild the + local info more precisely. 2006-02-20 jrandom * Properly enable TCP this time (oops) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 221ba136f..e92215ef0 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.353 $ $Date: 2006/02/20 13:12:48 $"; + public final static String ID = "$Revision: 1.354 $ $Date: 2006/02/20 21:02:49 $"; public final static String VERSION = "0.6.1.10"; - public final static long BUILD = 9; + public final static long BUILD = 10; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 1f3eaffd2..1bf237366 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -211,6 +211,7 @@ public class StatisticsManager implements Service { Rate curRate = rate.getRate(periods[i]); if (curRate == null) continue; + if (curRate.getLifetimeEventCount() <= 0) continue; stats.setProperty("stat_" + rateName + '.' + getPeriod(curRate), renderRate(curRate, fudgeQuantity)); } } diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index ecdce78a4..d362acf01 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -174,6 +174,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource if (!fragmentOK) break; } + from.expireInboundMessages(); return fragments; } diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index add0d9c3a..3696b2e5f 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -2,7 +2,10 @@ package net.i2p.router.transport.udp; import java.util.*; +import net.i2p.data.Base64; import net.i2p.data.SessionKey; +import net.i2p.data.RouterInfo; +import net.i2p.data.RouterAddress; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -68,18 +71,47 @@ public class IntroductionManager { return (PeerState)_outbound.get(new Long(id)); } - public void pickInbound(List rv, int howMany) { + /** + * Grab a bunch of peers who are willing to be introducers for us that + * are locally known (duh) and have published their own SSU address (duh^2). + * The picked peers have their info tacked on to the ssuOptions parameter for + * use in the SSU RouterAddress. + * + */ + public int pickInbound(Properties ssuOptions, int howMany) { + List peers = null; int start = _context.random().nextInt(Integer.MAX_VALUE); synchronized (_inbound) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Picking inbound out of " + _inbound); - if (_inbound.size() <= 0) return; - start = start % _inbound.size(); - for (int i = 0; i < _inbound.size() && rv.size() < howMany; i++) { - PeerState cur = (PeerState)_inbound.get((start + i) % _inbound.size()); - rv.add(cur); - } + if (_inbound.size() <= 0) return 0; + peers = new ArrayList(_inbound); } + int sz = peers.size(); + start = start % sz; + int found = 0; + for (int i = 0; i < sz && found < howMany; i++) { + PeerState cur = (PeerState)peers.get((start + i) % sz); + RouterInfo ri = _context.netDb().lookupRouterInfoLocally(cur.getRemotePeer()); + if (ri == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Picked peer has no local routerInfo: " + cur); + continue; + } + RouterAddress ra = ri.getTargetAddress(UDPTransport.STYLE); + if (ra == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Picked peer has no SSU address: " + ri); + continue; + } + UDPAddress ura = new UDPAddress(ra); + ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString()); + ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort())); + ssuOptions.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + found, Base64.encode(ura.getIntroKey())); + ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs())); + found++; + } + return found; } public void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 2e80276be..c7c07b35f 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -142,6 +142,11 @@ public class OutboundMessageFragments { boolean ok = state.initialize(msg, msgBody); if (ok) { PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash()); + if (peer == null) { + _transport.failed(msg, "Peer disconnected quickly"); + state.releaseResources(); + return; + } int active = peer.add(state); synchronized (_activePeers) { if (!_activePeers.contains(peer)) { diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 73fe48d71..90766d300 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -61,7 +61,8 @@ public class PacketHandler { _context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", new long[] { 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (when its slow)", "udp", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } public void startup() { @@ -140,8 +141,11 @@ public class PacketHandler { timeToVerify = beforeRecv - packet.getTimeSinceReceived(); if (timeToDequeue > 50) _context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue); - if (timeToVerify > 50) - _context.statManager().addRateData("udp.packetVerifyTime", timeToVerify, timeToVerify); + if (timeToVerify > 0) { + _context.statManager().addRateData("udp.packetVerifyTime", timeToVerify, timeToDequeue); + if (timeToVerify > 100) + _context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToVerify, timeToDequeue); + } // back to the cache with thee! packet.release(); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 9a33e5b62..00c41c5d2 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -606,7 +606,7 @@ public class PeerState { int remaining = _inboundMessages.size(); for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) { InboundMessageState state = (InboundMessageState)iter.next(); - if (state.isExpired()) { + if (state.isExpired() || _dead) { iter.remove(); } else { if (state.isComplete()) { @@ -977,6 +977,10 @@ public class PeerState { public int add(OutboundMessageState state) { + if (_dead) { + _transport.failed(state, false); + return 0; + } state.setPeer(this); if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); @@ -989,20 +993,23 @@ public class PeerState { } /** drop all outbound messages */ public void dropOutbound() { - if (_dead) return; + //if (_dead) return; _dead = true; List msgs = _outboundMessages; //_outboundMessages = null; _retransmitter = null; if (msgs != null) { + int sz = 0; List tempList = null; synchronized (msgs) { - tempList = new ArrayList(msgs); - msgs.clear(); + sz = msgs.size(); + if (sz > 0) { + tempList = new ArrayList(msgs); + msgs.clear(); + } } - int sz = tempList.size(); for (int i = 0; i < sz; i++) - _transport.failed((OutboundMessageState)tempList.get(i)); + _transport.failed((OutboundMessageState)tempList.get(i), false); } } @@ -1025,7 +1032,10 @@ public class PeerState { public int finishMessages() { int rv = 0; List msgs = _outboundMessages; - if (_dead) return 0; + if (_dead) { + dropOutbound(); + return 0; + } List succeeded = null; List failed = null; synchronized (msgs) { @@ -1404,14 +1414,18 @@ public class PeerState { tmp.addAll(oldPeer._currentACKs); oldPeer._currentACKs.clear(); } - synchronized (_currentACKs) { _currentACKs.addAll(tmp); } + if (!_dead) { + synchronized (_currentACKs) { _currentACKs.addAll(tmp); } + } tmp.clear(); synchronized (oldPeer._currentACKsResend) { tmp.addAll(oldPeer._currentACKsResend); oldPeer._currentACKsResend.clear(); } - synchronized (_currentACKsResend) { _currentACKsResend.addAll(tmp); } + if (!_dead) { + synchronized (_currentACKsResend) { _currentACKsResend.addAll(tmp); } + } tmp.clear(); Map msgs = new HashMap(); @@ -1419,7 +1433,9 @@ public class PeerState { msgs.putAll(oldPeer._inboundMessages); oldPeer._inboundMessages.clear(); } - synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); } + if (!_dead) { + synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); } + } msgs.clear(); OutboundMessageState retransmitter = null; @@ -1429,9 +1445,11 @@ public class PeerState { retransmitter = oldPeer._retransmitter; oldPeer._retransmitter = null; } - synchronized (_outboundMessages) { - _outboundMessages.addAll(tmp); - _retransmitter = retransmitter; + if (!_dead) { + synchronized (_outboundMessages) { + _outboundMessages.addAll(tmp); + _retransmitter = retransmitter; + } } tmp.clear(); } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index a59ecb107..cdae00d2e 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -59,6 +59,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private long _introducersSelectedOn; private long _lastInboundReceivedOn; + /** do we need to rebuild our external router address asap? */ + private boolean _needsRebuild; + /** summary info to distribute */ private RouterAddress _externalAddress; /** port number on which we can be reached, or -1 */ @@ -140,6 +143,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _introManager = new IntroductionManager(_context, this); _introducersSelectedOn = -1; _lastInboundReceivedOn = -1; + _needsRebuild = true; _context.statManager().createRateStat("udp.alreadyConnected", "What is the lifetime of a reestablished session", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -302,7 +306,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_log.shouldLog(Log.INFO)) _log.info("External address received: " + RemoteHostId.toString(ourIP) + ":" + ourPort + " from " + from.toBase64() + ", isValid? " + isValid + ", explicitSpecified? " + explicitSpecified - + ", receivedInboundRecent? " + inboundRecent); + + ", receivedInboundRecent? " + inboundRecent + " status " + _reachabilityStatus); if (explicitSpecified) return; @@ -319,8 +323,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return; } else if (inboundRecent) { // use OS clock since its an ordering thing, not a time thing - if (_log.shouldLog(Log.WARN)) - _log.warn("Ignoring IP address suggestion, since we have received an inbound con recently"); + if (_log.shouldLog(Log.INFO)) + _log.info("Ignoring IP address suggestion, since we have received an inbound con recently"); } else { synchronized (this) { if ( (_externalListenHost == null) || @@ -328,6 +332,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if ( (_reachabilityStatus == CommSystemFacade.STATUS_UNKNOWN) || (_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) { // they told us something different and our tests are either old or failing + if (_log.shouldLog(Log.INFO)) + _log.info("Trying to change our external address..."); try { _externalListenHost = InetAddress.getByAddress(ourIP); if (!fixedPort) @@ -337,14 +343,20 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority updated = true; } catch (UnknownHostException uhe) { _externalListenHost = null; + if (_log.shouldLog(Log.INFO)) + _log.info("Error trying to change our external address", uhe); } } else { // they told us something different, but our tests are recent and positive, // so lets test again fireTest = true; + if (_log.shouldLog(Log.INFO)) + _log.info("Different address, but we're fine.."); } } else { // matched what we expect + if (_log.shouldLog(Log.INFO)) + _log.info("Same address as the current one"); } } } @@ -524,8 +536,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (oldEstablishedOn > 0) _context.statManager().addRateData("udp.alreadyConnected", oldEstablishedOn, 0); - // if we need introducers, try to shift 'em around every 10 minutes - if (introducersRequired() && (_introducersSelectedOn < _context.clock().now() - 10*60*1000)) + if (needsRebuild()) rebuildExternalAddress(); if (getReachabilityStatus() != CommSystemFacade.STATUS_OK) { @@ -536,8 +547,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } public RouterAddress getCurrentAddress() { - // if we need introducers, try to shift 'em around every 10 minutes - if (introducersRequired() && (_introducersSelectedOn < _context.clock().now() - 10*60*1000)) + if (needsRebuild()) rebuildExternalAddress(false); return super.getCurrentAddress(); } @@ -578,8 +588,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Received another message: " + inMsg.getClass().getName()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received another message: " + inMsg.getClass().getName()); } PeerState peer = getPeerState(remoteIdentHash); super.messageReceived(inMsg, remoteIdent, remoteIdentHash, msToReceive, bytesReceived); @@ -636,12 +646,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } peer.dropOutbound(); + peer.expireInboundMessages(); _introManager.remove(peer); _fragments.dropPeer(peer); - // a bit overzealous - perhaps we should only rebuild the external if the peer being dropped - // is one of our introducers? dropping it only if we are considered 'not reachable' is a start - if (introducersRequired()) - rebuildExternalAddress(); + + PeerState altByIdent = null; + PeerState altByHost = null; if (peer.getRemotePeer() != null) { dropPeerCapacities(peer); @@ -655,14 +665,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _context.statManager().addRateData("udp.droppedPeerInactive", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); } synchronized (_peersByIdent) { - _peersByIdent.remove(peer.getRemotePeer()); + altByIdent = (PeerState)_peersByIdent.remove(peer.getRemotePeer()); } } RemoteHostId remoteId = peer.getRemoteHostId(); if (remoteId != null) { synchronized (_peersByRemoteHost) { - _peersByRemoteHost.remove(remoteId); + altByHost = (PeerState)_peersByRemoteHost.remove(remoteId); } } @@ -672,6 +682,64 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (SHOULD_FLOOD_PEERS) _flooder.removePeer(peer); _expireEvent.remove(peer); + + if (needsRebuild()) + rebuildExternalAddress(); + + // deal with races to make sure we drop the peers fully + if ( (altByIdent != null) && (peer != altByIdent) ) dropPeer(altByIdent, shouldShitlist); + if ( (altByHost != null) && (peer != altByHost) ) dropPeer(altByHost, shouldShitlist); + } + + private boolean needsRebuild() { + if (_needsRebuild) return true; // simple enough + if (_context.router().isHidden()) return false; + if (introducersRequired()) { + RouterAddress addr = _externalAddress; + UDPAddress ua = new UDPAddress(addr); + int valid = 0; + Hash peerHash = new Hash(); + for (int i = 0; i < ua.getIntroducerCount(); i++) { + // warning: this is only valid as long as we use the ident hash as their key. + peerHash.setData(ua.getIntroducerKey(i)); + PeerState peer = getPeerState(peerHash); + if (peer != null) + valid++; + } + if (valid >= PUBLIC_RELAY_COUNT) { + // try to shift 'em around every 10 minutes or so + if (_introducersSelectedOn < _context.clock().now() - 10*60*1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Our introducers are valid, but thy havent changed in a while, so lets rechoose"); + return true; + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Our introducers are valid and haven't changed in a while"); + return false; + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Our introducers are not valid (" +valid + ")"); + return true; + } + } else { + boolean rv = (_externalListenHost == null) || (_externalListenPort <= 0); + if (_log.shouldLog(Log.INFO)) { + if (rv) { + _log.info("Need to initialize our direct SSU info"); + } else { + RouterAddress addr = _externalAddress; + UDPAddress ua = new UDPAddress(addr); + if ( (ua.getPort() <= 0) || (ua.getHost() == null) ) { + _log.info("Our direct SSU info is initialized, but not used in our address yet"); + rv = true; + } else { + _log.info("Our direct SSU info is initialized"); + } + } + } + return rv; + } } /** @@ -785,20 +853,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (ok) _fragments.add(state); } - - //public OutNetMessage getNextMessage() { return getNextMessage(-1); } - /** - * Get the next message, blocking until one is found or the expiration - * reached. - * - * @param blockUntil expiration, or -1 if indefinite - */ - /* - public OutNetMessage getNextMessage(long blockUntil) { - return _outboundMessages.getNext(blockUntil); - } - */ - // we don't need the following, since we have our own queueing protected void outboundMessageReady() { throw new UnsupportedOperationException("Not used for UDP"); } @@ -848,68 +902,58 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } Properties options = new Properties(); + boolean directIncluded = false; + if ( allowDirectUDP() && (_externalListenPort > 0) && (_externalListenHost != null) && (isValid(_externalListenHost.getAddress())) ) { + options.setProperty(UDPAddress.PROP_PORT, String.valueOf(_externalListenPort)); + options.setProperty(UDPAddress.PROP_HOST, _externalListenHost.getHostAddress()); + directIncluded = true; + } + boolean introducersRequired = introducersRequired(); - if (introducersRequired) { - List peers = new ArrayList(PUBLIC_RELAY_COUNT); - int found = 0; - _introManager.pickInbound(peers, PUBLIC_RELAY_COUNT); - if (_log.shouldLog(Log.INFO)) - _log.info("Introducers required, picked peers: " + peers); - for (int i = 0; i < peers.size(); i++) { - PeerState peer = (PeerState)peers.get(i); - RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer()); - if (ri == null) { - if (_log.shouldLog(Log.INFO)) - _log.info("Picked peer has no local routerInfo: " + peer); - continue; - } - RouterAddress ra = ri.getTargetAddress(STYLE); - if (ra == null) { - if (_log.shouldLog(Log.INFO)) - _log.info("Picked peer has no SSU address: " + ri); - continue; - } - UDPAddress ura = new UDPAddress(ra); - options.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + i, peer.getRemoteHostId().toHostString()); - options.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + i, String.valueOf(peer.getRemotePort())); - options.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + i, Base64.encode(ura.getIntroKey())); - options.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + i, String.valueOf(peer.getTheyRelayToUsAs())); - found++; - } + boolean introducersIncluded = false; + if (introducersRequired || !directIncluded) { + int found = _introManager.pickInbound(options, PUBLIC_RELAY_COUNT); if (found > 0) { if (_log.shouldLog(Log.INFO)) _log.info("Picked peers: " + found); _introducersSelectedOn = _context.clock().now(); + introducersIncluded = true; } - } - if ( allowDirectUDP() && (_externalListenPort > 0) && (_externalListenHost != null) && (isValid(_externalListenHost.getAddress())) ) { - options.setProperty(UDPAddress.PROP_PORT, String.valueOf(_externalListenPort)); - options.setProperty(UDPAddress.PROP_HOST, _externalListenHost.getHostAddress()); - // if we have explicit external addresses, they had better be reachable - if (introducersRequired) - options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING); - else - options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING + UDPAddress.CAPACITY_INTRODUCER); } - options.setProperty(UDPAddress.PROP_INTRO_KEY, _introKey.toBase64()); - RouterAddress addr = new RouterAddress(); - addr.setCost(5); - addr.setExpiration(null); - addr.setTransportStyle(STYLE); - addr.setOptions(options); - - boolean wantsRebuild = false; - if ( (_externalAddress == null) || !(_externalAddress.equals(addr)) ) - wantsRebuild = true; - - RouterAddress oldAddress = _externalAddress; - _externalAddress = addr; - if (_log.shouldLog(Log.INFO)) - _log.info("Address rebuilt: " + addr); - replaceAddress(addr, oldAddress); - if (allowRebuildRouterInfo) - _context.router().rebuildRouterInfo(); + // if we have explicit external addresses, they had better be reachable + if (introducersRequired) + options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING); + else + options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING + UDPAddress.CAPACITY_INTRODUCER); + + if (directIncluded || introducersIncluded) { + options.setProperty(UDPAddress.PROP_INTRO_KEY, _introKey.toBase64()); + + RouterAddress addr = new RouterAddress(); + addr.setCost(5); + addr.setExpiration(null); + addr.setTransportStyle(STYLE); + addr.setOptions(options); + + boolean wantsRebuild = false; + if ( (_externalAddress == null) || !(_externalAddress.equals(addr)) ) + wantsRebuild = true; + + RouterAddress oldAddress = _externalAddress; + _externalAddress = addr; + if (_log.shouldLog(Log.INFO)) + _log.info("Address rebuilt: " + addr); + replaceAddress(addr, oldAddress); + if (allowRebuildRouterInfo && wantsRebuild) + _context.router().rebuildRouterInfo(); + _needsRebuild = false; + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Wanted to rebuild my SSU address, but couldn't specify either the direct or indirect info (needs introducers? " + + introducersRequired + ")", new Exception("source")); + _needsRebuild = true; + } } protected void replaceAddress(RouterAddress address, RouterAddress oldAddress) { @@ -940,13 +984,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public boolean introducersRequired() { String forceIntroducers = _context.getProperty(PROP_FORCE_INTRODUCERS); - if ( (forceIntroducers != null) && (Boolean.valueOf(forceIntroducers).booleanValue()) ) + if ( (forceIntroducers != null) && (Boolean.valueOf(forceIntroducers).booleanValue()) ) { + if (_log.shouldLog(Log.INFO)) + _log.info("Force introducers specified"); return true; - switch (getReachabilityStatus()) { + } + short status = getReachabilityStatus(); + switch (status) { case CommSystemFacade.STATUS_REJECT_UNSOLICITED: case CommSystemFacade.STATUS_DIFFERENT: + if (_log.shouldLog(Log.INFO)) + _log.info("Require introducers, because our status is " + status); return true; default: + if (!allowDirectUDP()) { + if (_log.shouldLog(Log.INFO)) + _log.info("Require introducers, because we do not allow direct UDP connections"); + return true; + } return false; } } @@ -966,11 +1021,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int DROP_INACTIVITY_TIME = 60*1000; - public void failed(OutboundMessageState msg) { + public void failed(OutboundMessageState msg) { failed(msg, true); } + void failed(OutboundMessageState msg, boolean allowPeerFailure) { if (msg == null) return; int consecutive = 0; OutNetMessage m = msg.getMessage(); - if ( (msg.getPeer() != null) && + if ( allowPeerFailure && (msg.getPeer() != null) && ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || (msg.isExpired())) ) { //long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime(); @@ -1456,6 +1512,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final long STATUS_GRACE_PERIOD = 5*60*1000; void setReachabilityStatus(short status) { + short old = _reachabilityStatus; long now = _context.clock().now(); switch (status) { case CommSystemFacade.STATUS_OK: @@ -1485,6 +1542,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority //} break; } + if ( (status != old) && (status != CommSystemFacade.STATUS_UNKNOWN) ) { + if (needsRebuild()) + rebuildExternalAddress(); + } } private static final String PROP_REACHABILITY_STATUS_OVERRIDE = "i2np.udp.status"; public short getReachabilityStatus() {