- Add persistent local DHT storage

- Shutdown now closes tunnel
- Delay after sending stop announces at shutdown
- Stub out using Hash cache
- Implement stop for all cleaners
- Log tweaks
This commit is contained in:
zzz
2012-06-04 22:34:56 +00:00
parent a6f7761544
commit 3f40487c99
7 changed files with 216 additions and 30 deletions

View File

@ -261,7 +261,8 @@ public class I2PSnarkUtil {
// FIXME this can cause race NPEs elsewhere
_manager = null;
_shitlist.clear();
mgr.destroySocketManager();
if (mgr != null)
mgr.destroySocketManager();
// this will delete a .torrent file d/l in progress so don't do that...
FileUtil.rmdir(_tmpDir, false);
// in case the user will d/l a .torrent file next...
@ -405,6 +406,7 @@ public class I2PSnarkUtil {
byte[] b = Base32.decode(ip.substring(0, BASE32_HASH_LENGTH));
if (b != null) {
Hash h = new Hash(b);
//Hash h = Hash.create(b);
if (_log.shouldLog(Log.INFO))
_log.info("Using existing session for lookup of " + ip);
try {

View File

@ -1514,15 +1514,23 @@ public class SnarkManager implements Snark.CompleteListener {
}
}
public class SnarkManagerShutdown extends I2PAppThread {
private class SnarkManagerShutdown extends I2PAppThread {
@Override
public void run() {
Set names = listTorrentFiles();
int running = 0;
for (Iterator iter = names.iterator(); iter.hasNext(); ) {
Snark snark = getTorrent((String)iter.next());
if ( (snark != null) && (!snark.isStopped()) )
if (snark != null && !snark.isStopped()) {
snark.stopTorrent();
running++;
}
}
_snarks.clear();
if (running > 0) {
try { sleep(1500); } catch (InterruptedException ie) {};
}
_util.disconnect();
}
}

View File

@ -15,8 +15,7 @@ import net.i2p.I2PAppContext;
import net.i2p.crypto.SHA1Hash;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* All the nodes we know about, stored as a mapping from
@ -33,6 +32,7 @@ class DHTNodes extends ConcurrentHashMap<NID, NodeInfo> {
private final I2PAppContext _context;
private long _expireTime;
private final Log _log;
private volatile boolean _isRunning;
/** stagger with other cleaners */
private static final long CLEAN_TIME = 237*1000;
@ -46,7 +46,16 @@ class DHTNodes extends ConcurrentHashMap<NID, NodeInfo> {
_context = ctx;
_expireTime = MAX_EXPIRE_TIME;
_log = _context.logManager().getLog(DHTNodes.class);
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
}
public void start() {
_isRunning = true;
new Cleaner();
}
public void stop() {
clear();
_isRunning = false;
}
/**
@ -82,9 +91,15 @@ class DHTNodes extends ConcurrentHashMap<NID, NodeInfo> {
****/
/** */
private class Cleaner implements SimpleTimer.TimedEvent {
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
super(SimpleTimer2.getInstance(), CLEAN_TIME);
}
public void timeReached() {
if (!_isRunning)
return;
long now = _context.clock().now();
int peerCount = 0;
for (Iterator<NodeInfo> iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) {
@ -100,11 +115,12 @@ class DHTNodes extends ConcurrentHashMap<NID, NodeInfo> {
else
_expireTime = Math.min(_expireTime + DELTA_EXPIRE_TIME, MAX_EXPIRE_TIME);
if (_log.shouldLog(Log.INFO))
_log.info("DHT storage cleaner done, now with " +
if (_log.shouldLog(Log.DEBUG))
_log.debug("DHT storage cleaner done, now with " +
peerCount + " peers, " +
DataHelper.formatDuration(_expireTime) + " expiration");
schedule(CLEAN_TIME);
}
}
}

View File

@ -12,8 +12,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* The tracker stores peers, i.e. Dest hashes (not nodes).
@ -27,6 +26,7 @@ class DHTTracker {
private final Torrents _torrents;
private long _expireTime;
private final Log _log;
private volatile boolean _isRunning;
/** stagger with other cleaners */
private static final long CLEAN_TIME = 199*1000;
@ -41,12 +41,16 @@ class DHTTracker {
_torrents = new Torrents();
_expireTime = MAX_EXPIRE_TIME;
_log = _context.logManager().getLog(DHTTracker.class);
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
}
public void start() {
_isRunning = true;
new Cleaner();
}
void stop() {
_torrents.clear();
// no way to stop the cleaner
_isRunning = false;
}
void announce(InfoHash ih, Hash hash) {
@ -93,9 +97,15 @@ class DHTTracker {
return rv;
}
private class Cleaner implements SimpleTimer.TimedEvent {
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
super(SimpleTimer2.getInstance(), CLEAN_TIME);
}
public void timeReached() {
if (!_isRunning)
return;
long now = _context.clock().now();
int torrentCount = 0;
int peerCount = 0;
@ -122,11 +132,12 @@ class DHTTracker {
else
_expireTime = Math.min(_expireTime + DELTA_EXPIRE_TIME, MAX_EXPIRE_TIME);
if (_log.shouldLog(Log.INFO))
_log.info("DHT tracker cleaner done, now with " +
if (_log.shouldLog(Log.DEBUG))
_log.debug("DHT tracker cleaner done, now with " +
torrentCount + " torrents, " +
peerCount + " peers, " +
DataHelper.formatDuration(_expireTime) + " expiration");
schedule(CLEAN_TIME);
}
}
}

View File

@ -5,6 +5,7 @@ package org.klomp.snark.dht;
*/
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@ -36,8 +37,6 @@ import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.SimpleDataStructure;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
import org.klomp.snark.bencode.BDecoder;
@ -108,6 +107,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private final int _rPort;
/** signed dgrams */
private final int _qPort;
private final File _dhtFile;
private volatile boolean _isRunning;
/** all-zero NID used for pings */
private static final NID _fakeNID = new NID(new byte[NID.HASH_LENGTH]);
@ -134,6 +135,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private static final long DEFAULT_QUERY_TIMEOUT = 75*1000;
/** stagger with other cleaners */
private static final long CLEAN_TIME = 63*1000;
private static final String DHT_FILE = "i2psnark.dht.dat";
public KRPC (I2PAppContext ctx, I2PSession session) {
_context = ctx;
@ -156,11 +158,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
ctx.random().nextBytes(_myID);
_myNID = new NID(_myID);
_myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort);
_dhtFile = new File(ctx.getConfigDir(), DHT_FILE);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort);
// can't be stopped
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
start();
}
///////////////// Public methods
@ -391,6 +393,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
public void announce(byte[] ih, byte[] peerHash) {
InfoHash iHash = new InfoHash(ih);
_tracker.announce(iHash, new Hash(peerHash));
// _tracker.announce(iHash, Hash.create(peerHash));
}
/**
@ -506,24 +509,32 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
}
/**
* Does nothing yet, everything is prestarted.
* Loads the DHT from file.
* Can't be restarted after stopping?
*/
public void start() {
_knownNodes.start();
_tracker.start();
PersistDHT.loadDHT(this, _dhtFile);
// start the explore thread
_isRunning = true;
// no need to keep ref, it will eventually stop
new Cleaner();
}
/**
* Stop everything.
*/
public void stop() {
_isRunning = false;
// FIXME stop the explore thread
// unregister port listeners
_session.removeListener(I2PSession.PROTO_DATAGRAM, _qPort);
_session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort);
// clear the DHT and tracker
_tracker.stop();
_knownNodes.clear();
PersistDHT.saveDHT(_knownNodes, _dhtFile);
_knownNodes.stop();
_sentQueries.clear();
_outgoingTokens.clear();
_incomingTokens.clear();
@ -1175,6 +1186,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
for (BEValue bev : peers) {
byte[] b = bev.getBytes();
Hash h = new Hash(b);
//Hash h = Hash.create(b);
rv.add(h);
}
if (_log.shouldLog(Log.INFO))
@ -1303,7 +1315,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (onTimeout != null)
onTimeout.run();
if (_log.shouldLog(Log.INFO))
_log.warn("timeout waiting for reply from " + this.toString());
_log.warn("timeout waiting for reply from " + ReplyWaiter.this.toString());
}
}
}
@ -1370,9 +1382,15 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
/**
* Cleaner-upper
*/
private class Cleaner implements SimpleTimer.TimedEvent {
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
super(SimpleTimer2.getInstance(), CLEAN_TIME);
}
public void timeReached() {
if (!_isRunning)
return;
long now = _context.clock().now();
for (Iterator<Token> iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) {
Token tok = iter.next();
@ -1390,12 +1408,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (ni.lastSeen() < now - MAX_NODEINFO_AGE)
iter.remove();
}
if (_log.shouldLog(Log.INFO))
_log.info("KRPC cleaner done, now with " +
if (_log.shouldLog(Log.DEBUG))
_log.debug("KRPC cleaner done, now with " +
_outgoingTokens.size() + " sent Tokens, " +
_incomingTokens.size() + " rcvd Tokens, " +
_knownNodes.size() + " known peers, " +
_sentQueries.size() + " queries awaiting response");
schedule(CLEAN_TIME);
}
}
}

View File

@ -3,6 +3,8 @@ package org.klomp.snark.dht;
* From zzzot, modded and relicensed to GPLv2
*/
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
@ -41,7 +43,7 @@ class NodeInfo extends SimpleDataStructure {
this.dest = dest;
this.hash = dest.calculateHash();
this.port = port;
initialize(nID, this.hash, port);
initialize();
}
/**
@ -53,7 +55,7 @@ class NodeInfo extends SimpleDataStructure {
this.nID = nID;
this.hash = hash;
this.port = port;
initialize(nID, hash, port);
initialize();
}
/**
@ -81,6 +83,36 @@ class NodeInfo extends SimpleDataStructure {
initialize(d);
}
/**
* Form persistent storage string.
* Format: NID:Hash:Destination:port
* First 3 in base 64; Destination may be empty string
* @throws IllegalArgumentException
*/
public NodeInfo(String s) throws DataFormatException {
super();
String[] parts = s.split(":", 4);
if (parts.length != 4)
throw new DataFormatException("Bad format");
byte[] nid = Base64.decode(parts[0]);
if (nid == null)
throw new DataFormatException("Bad NID");
nID = new NID(nid);
byte[] h = Base64.decode(parts[1]);
if (h == null)
throw new DataFormatException("Bad hash");
hash = new Hash(h);
//hash = Hash.create(h);
if (parts[2].length() > 0)
dest = new Destination(parts[2]);
try {
port = Integer.parseInt(parts[3]);
} catch (NumberFormatException nfe) {
throw new DataFormatException("Bad port", nfe);
}
initialize();
}
/**
* Creates data structures from the compact info
* @throws IllegalArgumentException
@ -91,18 +123,22 @@ class NodeInfo extends SimpleDataStructure {
byte[] ndata = new byte[NID.HASH_LENGTH];
System.arraycopy(compactInfo, 0, ndata, 0, NID.HASH_LENGTH);
this.nID = new NID(ndata);
//3 lines or...
byte[] hdata = new byte[Hash.HASH_LENGTH];
System.arraycopy(compactInfo, NID.HASH_LENGTH, hdata, 0, Hash.HASH_LENGTH);
this.hash = new Hash(hdata);
//this.hash = Hash.create(compactInfo, NID.HASH_LENGTH);
this.port = (int) DataHelper.fromLong(compactInfo, NID.HASH_LENGTH + Hash.HASH_LENGTH, 2);
if (port <= 0 || port >= 65535)
throw new IllegalArgumentException("Bad port");
}
/**
* Creates 54-byte compact info
* @throws IllegalArgumentException
*/
private void initialize(NID nID, Hash hash, int port) {
if (port < 0 || port > 65535)
private void initialize() {
if (port <= 0 || port >= 65535)
throw new IllegalArgumentException("Bad port");
byte[] compactInfo = new byte[LENGTH];
System.arraycopy(nID.getData(), 0, compactInfo, 0, NID.HASH_LENGTH);
@ -173,7 +209,24 @@ class NodeInfo extends SimpleDataStructure {
}
}
@Override
public String toString() {
return "NodeInfo: " + nID + ' ' + hash + " port: " + port;
}
/**
* To persistent storage string.
* Format: NID:Hash:Destination:port
* First 3 in base 64; Destination may be empty string
*/
public String toPersistentString() {
StringBuilder buf = new StringBuilder(650);
buf.append(nID.toBase64()).append(':');
buf.append(hash.toBase64()).append(':');
if (dest != null)
buf.append(dest.toBase64());
buf.append(':').append(port);
return buf.toString();
}
}

View File

@ -0,0 +1,77 @@
package org.klomp.snark.dht;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
import net.i2p.util.Log;
import net.i2p.util.SecureFileOutputStream;
/**
* Retrieve / Store the local DHT in a file
*
*/
abstract class PersistDHT {
public static synchronized void loadDHT(KRPC krpc, File file) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class);
int count = 0;
FileInputStream in = null;
try {
in = new FileInputStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(in, "ISO-8859-1"));
String line = null;
while ( (line = br.readLine()) != null) {
if (line.startsWith("#"))
continue;
try {
krpc.addNode(new NodeInfo(line));
count++;
// TODO limit number? this will flush the router's SDS caches
} catch (IllegalArgumentException iae) {
if (log.shouldLog(Log.WARN))
log.warn("Error reading DHT entry", iae);
} catch (DataFormatException dfe) {
if (log.shouldLog(Log.WARN))
log.warn("Error reading DHT entry", dfe);
}
}
} catch (IOException ioe) {
if (log.shouldLog(Log.WARN) && file.exists())
log.warn("Error reading the DHT File", ioe);
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
if (log.shouldLog(Log.INFO))
log.info("Loaded " + count + " nodes from " + file);
}
public static synchronized void saveDHT(DHTNodes nodes, File file) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class);
int count = 0;
PrintWriter out = null;
try {
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new SecureFileOutputStream(file), "ISO-8859-1")));
out.println("# DHT nodes, format is NID:Hash:Destination:port");
for (NodeInfo ni : nodes.values()) {
// DHTNodes shouldn't contain us, if that changes check here
out.println(ni.toPersistentString());
count++;
}
} catch (IOException ioe) {
if (log.shouldLog(Log.WARN))
log.warn("Error writing the DHT File", ioe);
} finally {
if (out != null) out.close();
}
if (log.shouldLog(Log.INFO))
log.info("Stored " + count + " nodes to " + file);
}
}