propagate from branch 'i2p.i2p' (head 31ab45aaed880aa1d8295541b813adb312582f17)

to branch 'i2p.i2p.zzz.test' (head 8f1820175b7df8fc1f880c4fe1104f1b0e633483)
This commit is contained in:
zzz
2009-05-18 18:22:24 +00:00
4 changed files with 179 additions and 70 deletions

View File

@ -16,10 +16,13 @@ import net.i2p.data.Hash;
public interface DataStore {
public boolean isKnown(Hash key);
public DataStructure get(Hash key);
public DataStructure get(Hash key, boolean persist);
public void put(Hash key, DataStructure data);
public void put(Hash key, DataStructure data, boolean persist);
public DataStructure remove(Hash key);
public DataStructure removeLease(Hash key);
public DataStructure remove(Hash key, boolean persist);
public Set getKeys();
public void stop();
public void restart();
public int countLeaseSets();

View File

@ -177,6 +177,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
public void shutdown() {
_initialized = false;
_kb = null;
_ds.stop();
_ds = null;
_exploreKeys.clear(); // hope this doesn't cause an explosion, it shouldn't.
// _exploreKeys = null;
@ -702,9 +703,13 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
* @throws IllegalArgumentException if the routerInfo is not valid
*/
public RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException {
return store(key, routerInfo, true);
}
public RouterInfo store(Hash key, RouterInfo routerInfo, boolean persist) throws IllegalArgumentException {
if (!_initialized) return null;
RouterInfo rv = (RouterInfo)_ds.get(key);
RouterInfo rv = (RouterInfo)_ds.get(key, persist);
if ( (rv != null) && (rv.equals(routerInfo)) ) {
// no need to validate
@ -721,7 +726,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
+ new Date(routerInfo.getPublished()));
_context.peerManager().setCapabilities(key, routerInfo.getCapabilities());
_ds.put(key, routerInfo);
_ds.put(key, routerInfo, persist);
if (rv == null)
_kb.add(key);
return rv;
@ -752,10 +757,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
// if we dont know the key, lets make sure it isn't a now-dead peer
}
if (isRouterInfo)
_ds.remove(dbEntry);
else
_ds.removeLease(dbEntry);
_ds.remove(dbEntry, isRouterInfo);
}
/** don't use directly - see F.N.D.F. override */

View File

@ -13,10 +13,10 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataStructure;
@ -48,44 +48,86 @@ class PersistentDataStore extends TransientDataStore {
_dbDir = dbDir;
_facade = facade;
_context.jobQueue().addJob(new ReadJob());
ctx.statManager().createRateStat("netDb.writeClobber", "How often we clobber a pending netDb write", "NetworkDatabase", new long[] { 60*1000, 10*60*1000 });
ctx.statManager().createRateStat("netDb.writePending", "How many pending writes are there", "NetworkDatabase", new long[] { 60*1000, 10*60*1000 });
ctx.statManager().createRateStat("netDb.writeClobber", "How often we clobber a pending netDb write", "NetworkDatabase", new long[] { 20*60*1000 });
ctx.statManager().createRateStat("netDb.writePending", "How many pending writes are there", "NetworkDatabase", new long[] { 60*1000 });
ctx.statManager().createRateStat("netDb.writeOut", "How many we wrote", "NetworkDatabase", new long[] { 20*60*1000 });
ctx.statManager().createRateStat("netDb.writeTime", "How long it took", "NetworkDatabase", new long[] { 20*60*1000 });
_writer = new Writer();
I2PThread writer = new I2PThread(_writer, "DBWriter");
writer.setDaemon(true);
// stop() must be called to flush data to disk
//writer.setDaemon(true);
writer.start();
}
@Override
public void stop() {
super.stop();
_writer.flush();
}
@Override
public void restart() {
super.restart();
_dbDir = _facade.getDbDir();
}
@Override
public DataStructure get(Hash key) {
return get(key, true);
}
/**
* Prepare for having only a partial set in memory and the rest on disk
* @param persist if false, call super only, don't access disk
*/
@Override
public DataStructure get(Hash key, boolean persist) {
DataStructure rv = super.get(key);
/*****
if (rv != null || !persist)
return rv;
rv = _writer.get(key);
if (rv != null)
return rv;
Job rrj = new ReadRouterJob(getRouterInfoName(key), key));
run in same thread
rrj.runJob();
*******/
return rv;
}
@Override
public DataStructure remove(Hash key) {
_context.jobQueue().addJob(new RemoveJob(key));
return remove(key, true);
}
/*
* @param persist if false, call super only, don't access disk
*/
@Override
public DataStructure remove(Hash key, boolean persist) {
if (persist)
_context.jobQueue().addJob(new RemoveJob(key));
return super.remove(key);
}
@Override
public void put(Hash key, DataStructure data) {
put(key, data, true);
}
/*
* @param persist if false, call super only, don't access disk
*/
@Override
public void put(Hash key, DataStructure data, boolean persist) {
if ( (data == null) || (key == null) ) return;
super.put(key, data);
// Don't bother writing LeaseSets to disk
if (data instanceof RouterInfo)
if (persist && data instanceof RouterInfo)
_writer.queue(key, data);
}
private void accept(LeaseSet ls) {
super.put(ls.getDestination().calculateHash(), ls);
}
private void accept(RouterInfo ri) {
Hash key = ri.getIdentity().getHash();
super.put(key, ri);
// add recently loaded routers to the routing table
_facade.getKBuckets().add(key);
}
private class RemoveJob extends JobImpl {
private Hash _key;
public RemoveJob(Hash key) {
@ -104,56 +146,100 @@ class PersistentDataStore extends TransientDataStore {
}
}
/** How many files to write every 10 minutes. Doesn't make sense to limit it,
* they just back up in the queue hogging memory.
*/
private static final int WRITE_LIMIT = 10000;
private static final long WRITE_DELAY = 10*60*1000;
/*
* Queue up writes, write up to 600 files every 10 minutes
* Queue up writes, write unlimited files every 10 minutes.
* Since we write all we have, don't save the write order.
* We store a reference to the data here too,
* rather than simply pull it from super.get(), because
* we will soon have to implement a scheme for keeping only
* a subset of all DataStructures in memory and keeping the rest on disk.
*/
private class Writer implements Runnable {
private final Map _keys;
private List _keyOrder;
private final Map<Hash, DataStructure>_keys;
private Object _waitLock;
private volatile boolean _quit;
public Writer() {
_keys = new HashMap(64);
_keyOrder = new ArrayList(64);
_keys = new ConcurrentHashMap(64);
_waitLock = new Object();
}
public void queue(Hash key, DataStructure data) {
boolean exists = false;
int pending = 0;
synchronized (_keys) {
pending = _keys.size();
exists = (null != _keys.put(key, data));
if (!exists)
_keyOrder.add(key);
_keys.notifyAll();
}
int pending = _keys.size();
boolean exists = (null != _keys.put(key, data));
if (exists)
_context.statManager().addRateData("netDb.writeClobber", pending, 0);
_context.statManager().addRateData("netDb.writePending", pending, 0);
}
/** check to see if it's in the write queue */
public DataStructure get(Hash key) {
return _keys.get(key);
}
public void run() {
_quit = false;
Hash key = null;
DataStructure data = null;
int count = 0;
while (true) { // hmm, probably want a shutdown handle... though this is a daemon thread
int lastCount = 0;
long startTime = 0;
while (true) {
// get a new iterator every time to get a random entry without
// having concurrency issues or copying to a List or Array
Iterator<Hash> iter = _keys.keySet().iterator();
try {
synchronized (_keys) {
if (_keyOrder.size() <= 0) {
count = 0;
_keys.wait();
} else {
count++;
key = (Hash)_keyOrder.remove(0);
data = (DataStructure)_keys.remove(key);
}
}
} catch (InterruptedException ie) {}
if ( (key != null) && (data != null) )
write(key, data);
key = null;
data = null;
if (count >= 600)
key = iter.next();
iter.remove();
count++;
} catch (NoSuchElementException nsee) {
lastCount = count;
count = 0;
if (count == 0)
try { Thread.sleep(10*60*1000); } catch (InterruptedException ie) {}
} catch (IllegalStateException ise) {
lastCount = count;
count = 0;
}
if (key != null) {
data = _keys.get(key);
if (data != null) {
write(key, data);
data = null;
}
key = null;
}
if (count >= WRITE_LIMIT)
count = 0;
if (count == 0) {
if (lastCount > 0) {
long time = _context.clock().now() - startTime;
if (_log.shouldLog(Log.WARN))
_log.warn("Wrote " + lastCount + " entries to disk in " + time);
_context.statManager().addRateData("netDb.writeOut", lastCount, 0);
_context.statManager().addRateData("netDb.writeTime", time, 0);
}
if (_quit)
break;
synchronized (_waitLock) {
try {
_waitLock.wait(WRITE_DELAY);
} catch (InterruptedException ie) {}
}
startTime = _context.clock().now();
}
}
}
public void flush() {
synchronized(_waitLock) {
_quit = true;
_waitLock.notifyAll();
}
}
}
@ -261,7 +347,8 @@ class PersistentDataStore extends TransientDataStore {
public String getName() { return "Read RouterInfo"; }
private boolean shouldRead() {
DataStructure data = get(_key);
// persist = false to call only super.get()
DataStructure data = get(_key, false);
if (data == null) return true;
if (data instanceof RouterInfo) {
long knownDate = ((RouterInfo)data).getPublished();
@ -292,7 +379,8 @@ class PersistentDataStore extends TransientDataStore {
+ " is from a different network");
} else {
try {
_facade.store(ri.getIdentity().getHash(), ri);
// persist = false so we don't write what we just read
_facade.store(ri.getIdentity().getHash(), ri, false);
} catch (IllegalArgumentException iae) {
_log.info("Refused locally loaded routerInfo - deleting");
corrupt = true;
@ -335,22 +423,22 @@ class PersistentDataStore extends TransientDataStore {
private final static String ROUTERINFO_PREFIX = "routerInfo-";
private final static String ROUTERINFO_SUFFIX = ".dat";
private String getLeaseSetName(Hash hash) {
private static String getLeaseSetName(Hash hash) {
return LEASESET_PREFIX + hash.toBase64() + LEASESET_SUFFIX;
}
private String getRouterInfoName(Hash hash) {
private static String getRouterInfoName(Hash hash) {
return ROUTERINFO_PREFIX + hash.toBase64() + ROUTERINFO_SUFFIX;
}
private Hash getLeaseSetHash(String filename) {
private static Hash getLeaseSetHash(String filename) {
return getHash(filename, LEASESET_PREFIX, LEASESET_SUFFIX);
}
private Hash getRouterInfoHash(String filename) {
private static Hash getRouterInfoHash(String filename) {
return getHash(filename, ROUTERINFO_PREFIX, ROUTERINFO_SUFFIX);
}
private Hash getHash(String filename, String prefix, String suffix) {
private static Hash getHash(String filename, String prefix, String suffix) {
try {
String key = filename.substring(prefix.length());
key = key.substring(0, key.length() - suffix.length());
@ -358,7 +446,8 @@ class PersistentDataStore extends TransientDataStore {
h.fromBase64(key);
return h;
} catch (Exception e) {
_log.warn("Unable to fetch the key from [" + filename + "]", e);
// static
//_log.warn("Unable to fetch the key from [" + filename + "]", e);
return null;
}
}

View File

@ -35,14 +35,23 @@ class TransientDataStore implements DataStore {
_log.info("Data Store initialized");
}
public void restart() {
public void stop() {
_data.clear();
}
public void restart() {
stop();
}
public Set getKeys() {
return new HashSet(_data.keySet());
}
/** for PersistentDataStore only - don't use here */
public DataStructure get(Hash key, boolean persist) {
throw new IllegalArgumentException("no");
}
public DataStructure get(Hash key) {
return _data.get(key);
}
@ -65,6 +74,11 @@ class TransientDataStore implements DataStore {
/** don't accept tunnels set to expire more than 3 hours in the future, which is insane */
private final static long MAX_FUTURE_EXPIRATION_DATE = KademliaNetworkDatabaseFacade.MAX_LEASE_FUTURE;
/** for PersistentDataStore only - don't use here */
public void put(Hash key, DataStructure data, boolean persist) {
throw new IllegalArgumentException("no");
}
public void put(Hash key, DataStructure data) {
if (data == null) return;
if (_log.shouldLog(Log.DEBUG))
@ -140,8 +154,9 @@ class TransientDataStore implements DataStore {
return buf.toString();
}
public DataStructure removeLease(Hash key) {
return remove(key);
/** for PersistentDataStore only - don't use here */
public DataStructure remove(Hash key, boolean persist) {
throw new IllegalArgumentException("no");
}
public DataStructure remove(Hash key) {