* DataHelper: Fix broken byte[] compareTo() used by XORComparator,

was not doing unsigned comparisons!
    * Floodfill rework part 2 of N:
      Store closest to the key, subject to last failed
      lookup and store stats.
    * FloodfillPeerSelector: Use standard XORComparator
      instead of messing with BigInteger
    * FloodfillVerifyStoreJob: Set correct timeout for
      requeued store job
    * KNDF: Rework getPeerTimout() to use 1 day averages,
      and lower the min, max, and multiplication factor.
    * Publish jobs: Lengthen timeout to 90s (was 30s for
      routerinfos and 60s for leasesets)
    * StoreJob: Limit max peer timeout to 15s for direct stores
This commit is contained in:
zzz
2009-11-10 18:24:15 +00:00
parent 42cbd6c12b
commit aa74962263
7 changed files with 144 additions and 47 deletions

View File

@ -722,6 +722,7 @@ public class DataHelper {
return true;
}
/** treat bytes as unsigned */
public final static int compareTo(byte lhs[], byte rhs[]) {
if ((rhs == null) && (lhs == null)) return 0;
if (lhs == null) return -1;
@ -729,9 +730,9 @@ public class DataHelper {
if (rhs.length < lhs.length) return 1;
if (rhs.length > lhs.length) return -1;
for (int i = 0; i < rhs.length; i++) {
if (rhs[i] > lhs[i])
if ((rhs[i] & 0xff) > (lhs[i] & 0xff))
return -1;
else if (rhs[i] < lhs[i]) return 1;
else if ((rhs[i] & 0xff) < (lhs[i] & 0xff)) return 1;
}
return 0;
}

View File

@ -67,7 +67,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
_context.inNetMessagePool().registerHandlerJobBuilder(DatabaseStoreMessage.MESSAGE_TYPE, new FloodfillDatabaseStoreMessageHandler(_context, this));
}
private static final long PUBLISH_TIMEOUT = 30*1000;
/**
* This maybe could be shorter than RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT,
* because we are sending direct, but unresponsive floodfills may take a while due to timeouts.
*/
static final long PUBLISH_TIMEOUT = 90*1000;
/**
* @throws IllegalArgumentException if the local router info is invalid

View File

@ -8,13 +8,12 @@ package net.i2p.router.networkdb.kademlia;
*
*/
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
@ -75,8 +74,12 @@ class FloodfillPeerSelector extends PeerSelector {
* List is not sorted and not shuffled.
*/
public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) {
return selectFloodfillParticipants(null, kbuckets);
}
public List<Hash> selectFloodfillParticipants(Set<Hash> toIgnore, KBucketSet kbuckets) {
if (kbuckets == null) return new ArrayList();
FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, null, 0);
FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, toIgnore, 0);
kbuckets.getAll(matches);
return matches.getFloodfillParticipants();
}
@ -86,26 +89,77 @@ class FloodfillPeerSelector extends PeerSelector {
* @param key the routing key
* @param maxNumRouters max to return
* Sorted by closest to the key if > maxNumRouters, otherwise not
* The list is in 3 groups - sorted by routing key within each group.
* Group 1: No store or lookup failure in last 15 minutes
* Group 2: No store or lookup failure in last 3 minutes
* Group 3: All others
*/
public List<Hash> selectFloodfillParticipants(Hash key, int maxNumRouters, KBucketSet kbuckets) {
List<Hash> ffs = selectFloodfillParticipants(kbuckets);
if (ffs.size() <= maxNumRouters)
return ffs; // unsorted
TreeMap<BigInteger, Hash> sorted = new TreeMap();
for (int i = 0; i < ffs.size(); i++) {
Hash h = ffs.get(i);
BigInteger diff = getDistance(key, h);
sorted.put(diff, h);
return selectFloodfillParticipants(key, maxNumRouters, null, kbuckets);
}
List<Hash> rv = new ArrayList(maxNumRouters);
for (int i = 0; i < maxNumRouters; i++) {
rv.add(sorted.remove(sorted.firstKey()));
/** 1.5 * PublishLocalRouterInfoJob.PUBLISH_DELAY */
private static final int NO_FAIL_STORE_OK = 30*60*1000;
private static final int NO_FAIL_STORE_GOOD = NO_FAIL_STORE_OK * 2;
private static final int NO_FAIL_LOOKUP_OK = 5*60*1000;
private static final int NO_FAIL_LOOKUP_GOOD = NO_FAIL_LOOKUP_OK * 3;
public List<Hash> selectFloodfillParticipants(Hash key, int howMany, Set<Hash> toIgnore, KBucketSet kbuckets) {
List<Hash> ffs = selectFloodfillParticipants(toIgnore, kbuckets);
TreeSet<Hash> sorted = new TreeSet(new XORComparator(key));
sorted.addAll(ffs);
List<Hash> rv = new ArrayList(howMany);
List<Hash> okff = new ArrayList(howMany);
List<Hash> badff = new ArrayList(howMany);
int found = 0;
long now = _context.clock().now();
// split sorted list into 3 sorted lists
for (int i = 0; found < howMany && i < ffs.size(); i++) {
Hash entry = sorted.first();
sorted.remove(entry);
if (entry == null)
break; // shouldn't happen
RouterInfo info = _context.netDb().lookupRouterInfoLocally(entry);
if (info != null && now - info.getPublished() > 3*60*60*1000) {
badff.add(entry);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Skipping, published a while ago: " + entry);
} else {
PeerProfile prof = _context.profileOrganizer().getProfile(entry);
if (prof != null && prof.getDBHistory() != null
&& now - prof.getDBHistory().getLastStoreFailed() > NO_FAIL_STORE_GOOD
&& now - prof.getDBHistory().getLastLookupFailed() > NO_FAIL_LOOKUP_GOOD) {
// good
rv.add(entry);
found++;
} else if (prof != null && prof.getDBHistory() != null
&& now - prof.getDBHistory().getLastStoreFailed() > NO_FAIL_STORE_OK
&& now - prof.getDBHistory().getLastLookupFailed() > NO_FAIL_LOOKUP_OK) {
okff.add(entry);
} else {
badff.add(entry);
}
}
}
// Put the ok floodfills after the good floodfills
for (int i = 0; found < howMany && i < okff.size(); i++) {
rv.add(okff.get(i));
found++;
}
// Put the "bad" floodfills after the ok floodfills
for (int i = 0; found < howMany && i < badff.size(); i++) {
rv.add(badff.get(i));
found++;
}
return rv;
}
private class FloodfillSelectionCollector implements SelectionCollector {
private TreeMap<BigInteger, Hash> _sorted;
private TreeSet<Hash> _sorted;
private List<Hash> _floodfillMatches;
private Hash _key;
private Set<Hash> _toIgnore;
@ -113,7 +167,7 @@ class FloodfillPeerSelector extends PeerSelector {
private int _wanted;
public FloodfillSelectionCollector(Hash key, Set<Hash> toIgnore, int wanted) {
_key = key;
_sorted = new TreeMap();
_sorted = new TreeSet(new XORComparator(key));
_floodfillMatches = new ArrayList(8);
_toIgnore = toIgnore;
_matches = 0;
@ -152,8 +206,7 @@ class FloodfillPeerSelector extends PeerSelector {
// So we keep going for a while. This, together with periodically shuffling the
// KBucket (see KBucketImpl.add()) makes exploration work well.
if ( (!SearchJob.onlyQueryFloodfillPeers(_context)) && (_wanted + EXTRA_MATCHES > _matches) && (_key != null) ) {
BigInteger diff = getDistance(_key, entry);
_sorted.put(diff, entry);
_sorted.add(entry);
} else {
return;
}
@ -216,10 +269,13 @@ class FloodfillPeerSelector extends PeerSelector {
rv.add(badff.get(i));
found++;
}
// are we corrupting _sorted here?
for (int i = rv.size(); i < howMany; i++) {
if (_sorted.size() <= 0)
break;
rv.add(_sorted.remove(_sorted.firstKey()));
Hash entry = _sorted.first();
rv.add(entry);
_sorted.remove(entry);
}
return rv;
}

View File

@ -176,14 +176,19 @@ public class FloodfillVerifyStoreJob extends JobImpl {
public void setMessage(I2NPMessage message) { _message = message; }
}
/** the netDb store failed to verify, so resend it to a random floodfill peer */
/**
* the netDb store failed to verify, so resend it to a random floodfill peer
* Fixme - this can loop for a long time - do we need a token or counter
* so we don't have multiple verify jobs?
*/
private void resend() {
DataStructure ds = null;
ds = _facade.lookupLeaseSetLocally(_key);
if (ds == null)
DataStructure ds;
if (_isRouterInfo)
ds = _facade.lookupRouterInfoLocally(_key);
else
ds = _facade.lookupLeaseSetLocally(_key);
if (ds != null)
_facade.sendStore(_key, ds, null, null, VERIFY_TIMEOUT, null);
_facade.sendStore(_key, ds, null, null, FloodfillNetworkDatabaseFacade.PUBLISH_TIMEOUT, null);
}
private class VerifyTimeoutJob extends JobImpl {
@ -197,6 +202,8 @@ public class FloodfillVerifyStoreJob extends JobImpl {
if (_sentTo != null)
getContext().profileManager().dbStoreFailed(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Verify timed out");
resend();
}
}

View File

@ -513,7 +513,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
// Don't spam the floodfills. In addition, always delay a few seconds since there may
// be another leaseset change coming along momentarily.
long nextTime = Math.max(j.lastPublished() + j.REPUBLISH_LEASESET_TIMEOUT, _context.clock().now() + PUBLISH_DELAY);
long nextTime = Math.max(j.lastPublished() + RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT, _context.clock().now() + PUBLISH_DELAY);
j.getTiming().setStartAfter(nextTime);
_context.jobQueue().addJob(j);
}
@ -885,20 +885,28 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
/** smallest allowed period */
private static final int MIN_PER_PEER_TIMEOUT = 5*1000;
private static final int MAX_PER_PEER_TIMEOUT = 10*1000;
private static final int MIN_PER_PEER_TIMEOUT = 2*1000;
/**
* We want FNDF.PUBLISH_TIMEOUT and RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT
* to be greater than MAX_PER_PEER_TIMEOUT * TIMEOUT_MULTIPLIER by a factor of at least
* 3 or 4, to allow at least that many peers to be attempted for a store.
*/
private static final int MAX_PER_PEER_TIMEOUT = 7*1000;
private static final int TIMEOUT_MULTIPLIER = 3;
/** todo: does this need more tuning? */
public int getPeerTimeout(Hash peer) {
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
double responseTime = MAX_PER_PEER_TIMEOUT;
if (prof != null && prof.getIsExpandedDB()) {
responseTime = prof.getDbResponseTime().getLifetimeAverageValue();
if (responseTime < MIN_PER_PEER_TIMEOUT)
responseTime = MIN_PER_PEER_TIMEOUT;
else if (responseTime > MAX_PER_PEER_TIMEOUT)
responseTime = prof.getDbResponseTime().getRate(24*60*60*1000l).getAverageValue();
// if 0 then there is no data, set to max.
if (responseTime <= 0 || responseTime > MAX_PER_PEER_TIMEOUT)
responseTime = MAX_PER_PEER_TIMEOUT;
else if (responseTime < MIN_PER_PEER_TIMEOUT)
responseTime = MIN_PER_PEER_TIMEOUT;
}
return 4 * (int)responseTime; // give it up to 4x the average response time
return TIMEOUT_MULTIPLIER * (int)responseTime; // give it up to 3x the average response time
}
public void sendStore(Hash key, DataStructure ds, Job onSuccess, Job onFailure, long sendTimeout, Set toIgnore) {

View File

@ -23,7 +23,7 @@ import net.i2p.util.Log;
public class RepublishLeaseSetJob extends JobImpl {
private Log _log;
private final static long REPUBLISH_LEASESET_DELAY = 5*60*1000;
public final /* static */ long REPUBLISH_LEASESET_TIMEOUT = 60*1000;
public final static long REPUBLISH_LEASESET_TIMEOUT = 90*1000;
private Hash _dest;
private KademliaNetworkDatabaseFacade _facade;
/** this is actually last attempted publish */

View File

@ -109,9 +109,13 @@ class StoreJob extends JobImpl {
}
if (isExpired()) {
_state.complete(true);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired: " + _timeoutMs);
fail();
} else if (_state.getAttempted().size() > MAX_PEERS_SENT) {
_state.complete(true);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Max sent");
fail();
} else {
//if (_log.shouldLog(Log.INFO))
@ -147,10 +151,11 @@ class StoreJob extends JobImpl {
// the network to scale.
// Perhaps the ultimate solution is to send RouterInfos through a lease also.
List<Hash> closestHashes;
if (_state.getData() instanceof RouterInfo)
closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted());
else
closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
//if (_state.getData() instanceof RouterInfo)
// closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted());
//else
// closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
closestHashes = getClosestFloodfillRouters(_state.getTarget(), toCheck, _state.getAttempted());
if ( (closestHashes == null) || (closestHashes.size() <= 0) ) {
if (_state.getPending().size() <= 0) {
if (_log.shouldLog(Log.INFO))
@ -217,6 +222,7 @@ class StoreJob extends JobImpl {
*
* @return ordered list of Hash objects
*/
/*****
private List<Hash> getClosestRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) {
Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key);
//if (_log.shouldLog(Log.DEBUG))
@ -226,13 +232,27 @@ class StoreJob extends JobImpl {
if (ks == null) return new ArrayList();
return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, ks);
}
*****/
/** used for routerinfo stores, prefers those already connected */
/*****
private List<Hash> getMostReliableRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) {
Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key);
KBucketSet ks = _facade.getKBuckets();
if (ks == null) return new ArrayList();
return _peerSelector.selectMostReliablePeers(rkey, numClosest, alreadyChecked, ks);
}
*****/
private List<Hash> getClosestFloodfillRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) {
Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key);
KBucketSet ks = _facade.getKBuckets();
if (ks == null) return new ArrayList();
return ((FloodfillPeerSelector)_peerSelector).selectFloodfillParticipants(rkey, numClosest, alreadyChecked, ks);
}
/** limit expiration for direct sends */
private static final int MAX_DIRECT_EXPIRATION = 15*1000;
/**
* Send a store to the given peer through a garlic route, including a reply
@ -242,9 +262,11 @@ class StoreJob extends JobImpl {
private void sendStore(RouterInfo router, int responseTime) {
DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());
msg.setKey(_state.getTarget());
if (_state.getData() instanceof RouterInfo)
if (_state.getData() instanceof RouterInfo) {
msg.setRouterInfo((RouterInfo)_state.getData());
else if (_state.getData() instanceof LeaseSet)
if (responseTime > MAX_DIRECT_EXPIRATION)
responseTime = MAX_DIRECT_EXPIRATION;
} else if (_state.getData() instanceof LeaseSet)
msg.setLeaseSet((LeaseSet)_state.getData());
else
throw new IllegalArgumentException("Storing an unknown data type! " + _state.getData());
@ -255,11 +277,11 @@ class StoreJob extends JobImpl {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Dont send store to ourselves - why did we try?");
return;
} else {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send store timeout is " + responseTime);
sendStore(msg, router, getContext().clock().now() + responseTime);
}
@ -269,7 +291,6 @@ class StoreJob extends JobImpl {
sendStoreThroughGarlic(msg, peer, expiration);
} else {
getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0);
// todo - shorter expiration for direct?
sendDirect(msg, peer, expiration);
}
}