Netdb: Remove all DataPublisher stuff

This commit is contained in:
zzz
2009-02-04 17:18:00 +00:00
parent a6dc27adaf
commit a82de3d1cf
4 changed files with 0 additions and 382 deletions

View File

@ -1,101 +0,0 @@
package net.i2p.router.networkdb.kademlia;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import net.i2p.data.DataStructure;
import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.router.JobImpl;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
class DataPublisherJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 120*1000;
private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time
private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data
public DataPublisherJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) {
super(ctx);
_log = ctx.logManager().getLog(DataPublisherJob.class);
_facade = facade;
getTiming().setStartAfter(ctx.clock().now()+RERUN_DELAY_MS); // not immediate...
}
public String getName() { return "Data Publisher Job"; }
public void runJob() {
Set toSend = selectKeysToSend();
if (_log.shouldLog(Log.INFO))
_log.info("Keys being published in this timeslice: " + toSend);
for (Iterator iter = toSend.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
DataStructure data = _facade.getDataStore().get(key);
if (data == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Trying to send a key we dont have? " + key);
continue;
}
if (data instanceof LeaseSet) {
LeaseSet ls = (LeaseSet)data;
if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not publishing a lease that isn't current - " + key,
new Exception("Publish expired lease?"));
}
if (!getContext().clientManager().shouldPublishLeaseSet(key))
continue;
}
_facade.sendStore(key, data, null, null, STORE_TIMEOUT, null);
//StoreJob store = new StoreJob(getContext(), _facade, key, data, null, null, STORE_TIMEOUT);
//getContext().jobQueue().addJob(store);
}
requeue(RERUN_DELAY_MS);
}
private Set selectKeysToSend() {
Set explicit = _facade.getExplicitSendKeys();
Set toSend = new HashSet(MAX_SEND_PER_RUN);
// if there's nothing we *need* to send, only send 10% of the time
if (explicit.size() <= 0) {
if (getContext().random().nextInt(10) > 0)
return toSend;
}
if (explicit.size() < MAX_SEND_PER_RUN) {
toSend.addAll(explicit);
_facade.removeFromExplicitSend(explicit);
Set passive = _facade.getPassivelySendKeys();
Set psend = new HashSet(passive.size());
for (Iterator iter = passive.iterator(); iter.hasNext(); ) {
if (toSend.size() >= MAX_SEND_PER_RUN) break;
Hash key = (Hash)iter.next();
toSend.add(key);
psend.add(key);
}
_facade.removeFromPassiveSend(psend);
} else {
for (Iterator iter = explicit.iterator(); iter.hasNext(); ) {
if (toSend.size() >= MAX_SEND_PER_RUN) break;
Hash key = (Hash)iter.next();
toSend.add(key);
}
_facade.removeFromExplicitSend(toSend);
}
return toSend;
}
}

View File

@ -1,175 +0,0 @@
package net.i2p.router.networkdb.kademlia;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeMap;
import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.data.RouterInfo;
import net.i2p.router.JobImpl;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
class DataRepublishingSelectorJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 1*60*1000;
public final static int MAX_PASSIVE_POOL_SIZE = 10; // no need to have the pool be too big
/**
* For every bucket away from us, resend period increases by 5 minutes - so we resend
* our own key every 5 minutes, and keys very far from us every 2.5 hours, increasing
* linearly
*/
public final static long RESEND_BUCKET_FACTOR = 5*60*1000;
/**
* % chance any peer not specializing in the lease's key will broadcast it on each pass
* of this job /after/ waiting 5 minutes (one RESENT_BUCKET_FACTOR). In other words,
* .5% of routers will broadcast a particular unexpired lease to (say) 5 peers every
* minute.
*
*/
private final static int LEASE_REBROADCAST_PROBABILITY = 5;
/**
* LEASE_REBROADCAST_PROBABILITY out of LEASE_REBROADCAST_PROBABILITY_SCALE chance.
*/
private final static int LEASE_REBROADCAST_PROBABILITY_SCALE = 1000;
public DataRepublishingSelectorJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) {
super(ctx);
_log = ctx.logManager().getLog(DataRepublishingSelectorJob.class);
_facade = facade;
getTiming().setStartAfter(ctx.clock().now()+RERUN_DELAY_MS); // not immediate...
}
public String getName() { return "Data Publisher Job"; }
public void runJob() {
Set toSend = selectKeysToSend();
if (_log.shouldLog(Log.INFO))
_log.info("Keys being queued up for publishing: " + toSend);
_facade.queueForPublishing(toSend);
requeue(RERUN_DELAY_MS);
}
/**
* Run through the entire data store, ranking how much we want to send each
* data point, and returning the ones we most want to send so that they can
* be placed in the passive send pool (without making the passive pool greater
* than the limit)
*
*/
private Set selectKeysToSend() {
Set alreadyQueued = new HashSet(128);
alreadyQueued.addAll(_facade.getPassivelySendKeys());
int toAdd = MAX_PASSIVE_POOL_SIZE - alreadyQueued.size();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Keys we need to queue up to fill the passive send pool: " + toAdd);
if (toAdd <= 0) return new HashSet();
alreadyQueued.addAll(_facade.getExplicitSendKeys());
Set keys = _facade.getDataStore().getKeys();
keys.removeAll(alreadyQueued);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Total number of keys in the datastore: " + keys.size());
TreeMap toSend = new TreeMap();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
Long lastPublished = _facade.getLastSent(key);
long publishRank = rankPublishNeed(key, lastPublished);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Publish rank for " + key + ": " + publishRank);
if (publishRank > 0) {
while (toSend.containsKey(new Long(publishRank)))
publishRank++;
toSend.put(new Long(publishRank), key);
}
}
Set rv = new HashSet(toAdd);
for (Iterator iter = toSend.values().iterator(); iter.hasNext(); ) {
if (rv.size() > toAdd) break;
Hash key = (Hash)iter.next();
rv.add(key);
}
return rv;
}
/**
* Higher values mean we want to publish it more, and values less than or equal to zero
* means we don't want to publish it
*
*/
private long rankPublishNeed(Hash key, Long lastPublished) {
int bucket = _facade.getKBuckets().pickBucket(key);
long sendPeriod = (bucket+1) * RESEND_BUCKET_FACTOR;
long now = getContext().clock().now();
if (lastPublished.longValue() < now-sendPeriod) {
RouterInfo ri = _facade.lookupRouterInfoLocally(key);
if (ri != null) {
if (ri.isCurrent(2 * ExpireRoutersJob.EXPIRE_DELAY)) {
// last time it was sent was before the last send period
return KBucketSet.NUM_BUCKETS - bucket;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Not republishing router " + key
+ " since it is really old ["
+ (now-ri.getPublished()) + "ms]");
return -2;
}
} else {
LeaseSet ls = _facade.lookupLeaseSetLocally(key);
if (ls != null) {
if (!getContext().clientManager().shouldPublishLeaseSet(ls.getDestination().calculateHash()))
return -3;
if (ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) {
// last time it was sent was before the last send period
return KBucketSet.NUM_BUCKETS - bucket;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Not republishing leaseSet " + key
+ " since it is really old ["
+ (now-ls.getEarliestLeaseDate()) + "ms]");
return -3;
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Key " + key + " is not a leaseSet or routerInfo, definitely not publishing it");
return -5;
}
}
} else {
// its been published since the last period we want to publish it
if (now - RESEND_BUCKET_FACTOR > lastPublished.longValue()) {
if (_facade.lookupRouterInfoLocally(key) != null) {
// randomize the chance of rebroadcast for leases if we haven't
// sent it within 5 minutes
int val = getContext().random().nextInt(LEASE_REBROADCAST_PROBABILITY_SCALE);
if (val <= LEASE_REBROADCAST_PROBABILITY) {
if (_log.shouldLog(Log.INFO))
_log.info("Randomized rebroadcast of leases tells us to send "
+ key + ": " + val);
return 1;
}
}
}
return -1;
}
}
}

View File

@ -53,10 +53,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
private DataStore _ds; // hash to DataStructure mapping, persisted when necessary
/** where the data store is pushing the data */
private String _dbDir;
private Set _explicitSendKeys; // set of Hash objects that should be published ASAP
private Set _passiveSendKeys; // set of Hash objects that should be published when there's time
private Set _exploreKeys; // set of Hash objects that we should search on (to fill up a bucket, not to get data)
private Map _lastSent; // Hash to Long (date last sent, or <= 0 for never)
private boolean _initialized;
/** Clock independent time of when we started up */
private long _started;
@ -153,53 +150,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_exploreJob.updateExploreSchedule();
}
public Set getExplicitSendKeys() {
if (!_initialized) return null;
synchronized (_explicitSendKeys) {
return new HashSet(_explicitSendKeys);
}
}
public Set getPassivelySendKeys() {
if (!_initialized) return null;
synchronized (_passiveSendKeys) {
return new HashSet(_passiveSendKeys);
}
}
public void removeFromExplicitSend(Set toRemove) {
if (!_initialized) return;
synchronized (_explicitSendKeys) {
_explicitSendKeys.removeAll(toRemove);
}
}
public void removeFromPassiveSend(Set toRemove) {
if (!_initialized) return;
synchronized (_passiveSendKeys) {
_passiveSendKeys.removeAll(toRemove);
}
}
public void queueForPublishing(Set toSend) {
if (!_initialized) return;
synchronized (_passiveSendKeys) {
_passiveSendKeys.addAll(toSend);
}
}
public Long getLastSent(Hash key) {
if (!_initialized) return null;
synchronized (_lastSent) {
if (!_lastSent.containsKey(key))
_lastSent.put(key, new Long(0));
return (Long)_lastSent.get(key);
}
}
public void noteKeySent(Hash key) {
if (!_initialized) return;
synchronized (_lastSent) {
_lastSent.put(key, new Long(_context.clock().now()));
}
}
public Set getExploreKeys() {
if (!_initialized) return null;
synchronized (_exploreKeys) {
@ -226,10 +176,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_initialized = false;
_kb = null;
_ds = null;
_explicitSendKeys = null;
_passiveSendKeys = null;
_exploreKeys = null;
_lastSent = null;
}
public void restart() {
@ -244,9 +191,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
else
_enforceNetId = DEFAULT_ENFORCE_NETID;
_ds.restart();
synchronized (_explicitSendKeys) { _explicitSendKeys.clear(); }
synchronized (_exploreKeys) { _exploreKeys.clear(); }
synchronized (_passiveSendKeys) { _passiveSendKeys.clear(); }
_initialized = true;
@ -273,10 +218,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_kb = new KBucketSet(_context, ri.getIdentity().getHash());
_ds = new PersistentDataStore(_context, dbDir, this);
//_ds = new TransientDataStore();
_explicitSendKeys = new HashSet(64);
_passiveSendKeys = new HashSet(64);
_exploreKeys = new HashSet(64);
_lastSent = new HashMap(1024);
_dbDir = dbDir;
createHandlers();
@ -284,9 +226,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_initialized = true;
_started = System.currentTimeMillis();
// read the queues and publish appropriately
if (false)
_context.jobQueue().addJob(new DataPublisherJob(_context, this));
// expire old leases
_context.jobQueue().addJob(new ExpireLeasesJob(_context, this));
@ -298,9 +237,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
////_context.jobQueue().addJob(new ExpireRoutersJob(_context, this));
if (!_quiet) {
// fill the passive queue periodically
// Is this pointless too???
_context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this));
// fill the search queue with random keys in buckets that are too small
// Disabled since KBucketImpl.generateRandomKey() is b0rked,
// and anyway, we want to search for a completely random key,
@ -532,9 +468,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (!_context.clientManager().shouldPublishLeaseSet(h))
return;
synchronized (_explicitSendKeys) {
_explicitSendKeys.add(h);
}
RepublishLeaseSetJob j = null;
synchronized (_publishingLeaseSets) {
j = (RepublishLeaseSetJob)_publishingLeaseSets.get(h);
@ -563,9 +496,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (_context.router().isHidden()) return; // DE-nied!
Hash h = localRouterInfo.getIdentity().getHash();
store(h, localRouterInfo);
synchronized (_explicitSendKeys) {
_explicitSendKeys.add(h);
}
}
/**
@ -658,10 +588,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
throw new IllegalArgumentException("Invalid store attempt - " + err);
_ds.put(key, leaseSet);
synchronized (_lastSent) {
if (!_lastSent.containsKey(key))
_lastSent.put(key, new Long(0));
}
// Iterate through the old failure / success count, copying over the old
// values (if any tunnels overlap between leaseSets). no need to be
@ -770,10 +696,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_context.peerManager().setCapabilities(key, routerInfo.getCapabilities());
_ds.put(key, routerInfo);
synchronized (_lastSent) {
if (!_lastSent.containsKey(key))
_lastSent.put(key, new Long(0));
}
if (rv == null)
_kb.add(key);
return rv;
@ -808,15 +730,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_ds.remove(dbEntry);
else
_ds.removeLease(dbEntry);
synchronized (_lastSent) {
_lastSent.remove(dbEntry);
}
synchronized (_explicitSendKeys) {
_explicitSendKeys.remove(dbEntry);
}
synchronized (_passiveSendKeys) {
_passiveSendKeys.remove(dbEntry);
}
}
/** don't use directly - see F.N.D.F. override */
@ -833,30 +746,12 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
_ds.remove(peer);
synchronized (_lastSent) {
_lastSent.remove(peer);
}
synchronized (_explicitSendKeys) {
_explicitSendKeys.remove(peer);
}
synchronized (_passiveSendKeys) {
_passiveSendKeys.remove(peer);
}
}
public void unpublish(LeaseSet localLeaseSet) {
if (!_initialized) return;
Hash h = localLeaseSet.getDestination().calculateHash();
DataStructure data = _ds.remove(h);
synchronized (_lastSent) {
_lastSent.remove(h);
}
synchronized (_explicitSendKeys) {
_explicitSendKeys.remove(h);
}
synchronized (_passiveSendKeys) {
_passiveSendKeys.remove(h);
}
if (data == null) {
if (_log.shouldLog(Log.WARN))

View File

@ -437,7 +437,6 @@ class StoreJob extends JobImpl {
_log.debug(getJobId() + ": State of successful send: " + _state);
if (_onSuccess != null)
getContext().jobQueue().addJob(_onSuccess);
_facade.noteKeySent(_state.getTarget());
_state.complete(true);
getContext().statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
}