2006-03-05 jrandom

* HTML fixes in Syndie to work better with opera (thanks shaklen!)
    * Give netDb lookups to floodfill peers more time, as they are much more
      likely to succeed (thereby cutting down on the unnecessary netDb
      searches outside the floodfill set)
    * Fix to the SSU IP detection code so we won't use introducers when we
      don't need them (thanks Complication!)
    * Add a brief shitlist to i2psnark so it doesn't keep on trying to reach
      peers given to it
    * Don't let netDb searches wander across too many peers
    * Don't use the 1s bandwidth usage in the tunnel participation throttle,
      as its too volatile to have much meaning.
    * Don't bork if a Syndie post is missing an entry.sml
This commit is contained in:
jrandom
2006-03-05 17:07:07 +00:00
committed by zzz
parent 883150f943
commit deb35f4af4
17 changed files with 462 additions and 78 deletions

View File

@ -10,6 +10,7 @@ import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import java.io.*;
import java.util.*;
@ -31,6 +32,7 @@ public class I2PSnarkUtil {
private Map _opts;
private I2PSocketManager _manager;
private boolean _configured;
private Set _shitlist;
private I2PSnarkUtil() {
_context = I2PAppContext.getGlobalContext();
@ -38,6 +40,7 @@ public class I2PSnarkUtil {
_opts = new HashMap();
setProxy("127.0.0.1", 4444);
setI2CPConfig("127.0.0.1", 7654, null);
_shitlist = new HashSet(64);
_configured = false;
}
@ -110,18 +113,36 @@ public class I2PSnarkUtil {
public void disconnect() {
I2PSocketManager mgr = _manager;
_manager = null;
_shitlist.clear();
mgr.destroySocketManager();
}
/** connect to the given destination */
I2PSocket connect(PeerID peer) throws IOException {
Hash dest = peer.getAddress().calculateHash();
synchronized (_shitlist) {
if (_shitlist.contains(dest))
throw new IOException("Not trying to contact " + dest.toBase64() + ", as they are shitlisted");
}
try {
return _manager.connect(peer.getAddress());
I2PSocket rv = _manager.connect(peer.getAddress());
if (rv != null) synchronized (_shitlist) { _shitlist.remove(dest); }
return rv;
} catch (I2PException ie) {
synchronized (_shitlist) {
_shitlist.add(dest);
}
SimpleTimer.getInstance().addEvent(new Unshitlist(dest), 10*60*1000);
throw new IOException("Unable to reach the peer " + peer + ": " + ie.getMessage());
}
}
private class Unshitlist implements SimpleTimer.TimedEvent {
private Hash _dest;
public Unshitlist(Hash dest) { _dest = dest; }
public void timeReached() { synchronized (_shitlist) { _shitlist.remove(_dest); } }
}
/**
* fetch the given URL, returning the file it is stored in, or null on error
*/

View File

@ -211,7 +211,13 @@ public class Archive {
if (!entryDir.exists())
entryDir.mkdirs();
boolean ok = _extractor.extract(entryFile, entryDir, null, info);
boolean ok = true;
try {
ok = _extractor.extract(entryFile, entryDir, null, info);
} catch (IOException ioe) {
ok = false;
_log.error("Error extracting " + entryFile.getPath() + ", deleting it", ioe);
}
if (!ok) {
File files[] = entryDir.listFiles();
for (int i = 0; i < files.length; i++)

View File

@ -59,9 +59,9 @@ public class EntryExtractor {
}
public void extract(EntryContainer entry, File entryDir) throws IOException {
extractEntry(entry, entryDir);
extractHeaders(entry, entryDir);
extractMeta(entry, entryDir);
extractEntry(entry, entryDir);
Attachment attachments[] = entry.getAttachments();
if (attachments != null) {
for (int i = 0; i < attachments.length; i++) {
@ -97,10 +97,14 @@ public class EntryExtractor {
}
}
private void extractEntry(EntryContainer entry, File entryDir) throws IOException {
Entry e = entry.getEntry();
if (e == null) throw new IOException("Entry is null");
String text = e.getText();
if (text == null) throw new IOException("Entry text is null");
FileOutputStream out = null;
try {
out = new FileOutputStream(new File(entryDir, ENTRY));
out.write(DataHelper.getUTF8(entry.getEntry().getText()));
out.write(DataHelper.getUTF8(text));
} finally {
out.close();
}

View File

@ -60,7 +60,7 @@ public class EntryContainer {
this();
_entryURI = uri;
if ( (smlData == null) || (smlData.length <= 0) )
_entryData = new Entry(null);
_entryData = new Entry(""); //null);
else
_entryData = new Entry(DataHelper.getUTF8(smlData));
setHeader(HEADER_BLOGKEY, Base64.encode(uri.getKeyHash().getData()));
@ -277,7 +277,7 @@ public class EntryContainer {
}
if (_entryData == null)
_entryData = new Entry(null);
_entryData = new Entry(""); //null);
_attachments = new Attachment[attachments.size()];

View File

@ -64,13 +64,13 @@ public abstract class BaseServlet extends HttpServlet {
* key=value& of params that need to be tacked onto an http request that updates data, to
* prevent spoofing
*/
protected static String getAuthActionParams() { return PARAM_AUTH_ACTION + '=' + _authNonce + '&'; }
protected static String getAuthActionParams() { return PARAM_AUTH_ACTION + '=' + _authNonce + "&amp;"; }
/**
* key=value& of params that need to be tacked onto an http request that updates data, to
* prevent spoofing
*/
public static void addAuthActionParams(StringBuffer buf) {
buf.append(PARAM_AUTH_ACTION).append('=').append(_authNonce).append('&');
buf.append(PARAM_AUTH_ACTION).append('=').append(_authNonce).append("&amp;");
}
public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@ -866,22 +866,22 @@ public abstract class BaseServlet extends HttpServlet {
ThreadNode child = node.getChild(0);
buf.append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=');
buf.append(child.getEntry().getKeyHash().toBase64()).append('/');
buf.append(child.getEntry().getEntryId()).append('&');
buf.append(child.getEntry().getEntryId()).append("&amp;");
}
if (!empty(viewPost))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append("&amp;");
else if (!empty(viewThread))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append("&amp;");
if (!empty(offset))
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append("&amp;");
if (!empty(tags))
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append("&amp;");
if (!empty(author))
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(author).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(author).append("&amp;");
return buf.toString();
}
@ -901,21 +901,21 @@ public abstract class BaseServlet extends HttpServlet {
// collapse node == let the node be visible
buf.append('?').append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=');
buf.append(node.getEntry().getKeyHash().toBase64()).append('/');
buf.append(node.getEntry().getEntryId()).append('&');
buf.append(node.getEntry().getEntryId()).append("&amp;");
if (!empty(viewPost))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append("&amp;");
else if (!empty(viewThread))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append("&amp;");
if (!empty(offset))
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append("&amp;");
if (!empty(tags))
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append("&amp;");
if (!empty(author))
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(author).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(author).append("&amp;");
return buf.toString();
}
@ -939,23 +939,23 @@ public abstract class BaseServlet extends HttpServlet {
buf.append(uri);
buf.append('?');
if (!empty(visible))
buf.append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=').append(visible).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_ADD_TO_GROUP_LOCATION).append('=').append(author.toBase64()).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_ADD_TO_GROUP_NAME).append('=').append(group).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=').append(visible).append("&amp;");
buf.append(ThreadedHTMLRenderer.PARAM_ADD_TO_GROUP_LOCATION).append('=').append(author.toBase64()).append("&amp;");
buf.append(ThreadedHTMLRenderer.PARAM_ADD_TO_GROUP_NAME).append('=').append(group).append("&amp;");
if (!empty(viewPost))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append("&amp;");
else if (!empty(viewThread))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append("&amp;");
if (!empty(offset))
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append("&amp;");
if (!empty(tags))
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append("&amp;");
if (!empty(filteredAuthor))
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(filteredAuthor).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(filteredAuthor).append("&amp;");
addAuthActionParams(buf);
return buf.toString();
@ -966,23 +966,23 @@ public abstract class BaseServlet extends HttpServlet {
buf.append(uri);
buf.append('?');
if (!empty(visible))
buf.append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=').append(visible).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_REMOVE_FROM_GROUP_NAME).append('=').append(name).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_REMOVE_FROM_GROUP).append('=').append(group).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=').append(visible).append("&amp;");
buf.append(ThreadedHTMLRenderer.PARAM_REMOVE_FROM_GROUP_NAME).append('=').append(name).append("&amp;");
buf.append(ThreadedHTMLRenderer.PARAM_REMOVE_FROM_GROUP).append('=').append(group).append("&amp;");
if (!empty(viewPost))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_POST).append('=').append(viewPost).append("&amp;");
else if (!empty(viewThread))
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=').append(viewThread).append("&amp;");
if (!empty(offset))
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append("&amp;");
if (!empty(tags))
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append("&amp;");
if (!empty(filteredAuthor))
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(filteredAuthor).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(filteredAuthor).append("&amp;");
addAuthActionParams(buf);
return buf.toString();
@ -1024,24 +1024,23 @@ public abstract class BaseServlet extends HttpServlet {
}
buf.append('?').append(ThreadedHTMLRenderer.PARAM_VISIBLE).append('=');
buf.append(expandTo.getKeyHash().toBase64()).append('/');
buf.append(expandTo.getEntryId()).append('&');
buf.append(expandTo.getEntryId()).append("&amp;");
buf.append(ThreadedHTMLRenderer.PARAM_VIEW_THREAD).append('=');
buf.append(node.getEntry().getKeyHash().toBase64()).append('/');
buf.append(node.getEntry().getEntryId()).append('&');
buf.append(node.getEntry().getEntryId()).append("&amp;");
if (!empty(offset))
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_OFFSET).append('=').append(offset).append("&amp;");
if (!empty(tags))
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_TAGS).append('=').append(tags).append("&amp;");
if (!empty(author)) {
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(author).append('&');
buf.append(ThreadedHTMLRenderer.PARAM_AUTHOR).append('=').append(author).append("&amp;");
if (authorOnly)
buf.append(ThreadedHTMLRenderer.PARAM_THREAD_AUTHOR).append("=true&");
buf.append(ThreadedHTMLRenderer.PARAM_THREAD_AUTHOR).append("=true&amp;");
}
buf.append("#").append(node.getEntry().toString());
return buf.toString();
}

View File

@ -1,4 +1,18 @@
$Id: history.txt,v 1.426 2006/03/04 18:50:01 complication Exp $
$Id: history.txt,v 1.427 2006/03/05 02:44:59 complication Exp $
2006-03-05 jrandom
* HTML fixes in Syndie to work better with opera (thanks shaklen!)
* Give netDb lookups to floodfill peers more time, as they are much more
likely to succeed (thereby cutting down on the unnecessary netDb
searches outside the floodfill set)
* Fix to the SSU IP detection code so we won't use introducers when we
don't need them (thanks Complication!)
* Add a brief shitlist to i2psnark so it doesn't keep on trying to reach
peers given to it
* Don't let netDb searches wander across too many peers
* Don't use the 1s bandwidth usage in the tunnel participation throttle,
as its too volatile to have much meaning.
* Don't bork if a Syndie post is missing an entry.sml
2006-03-05 Complication
* Reduce exposed statistical information,

View File

@ -483,10 +483,6 @@ public class LoadTestManager {
// length == #hops+1 (as it includes the creator)
if (cfg.getLength() < 2)
return false;
// only load test the client tunnels
// XXX why?
////if (cfg.getTunnel().getDestination() == null)
//// return false;
_active.add(cfg);
return true;
} else {
@ -496,18 +492,26 @@ public class LoadTestManager {
private boolean bandwidthOverloaded() {
int msgLoadBps = CONCURRENT_MESSAGES
* 5 // message size
* 5 * 1024 // message size
/ 10; // 10 seconds before timeout & retransmission
msgLoadBps *= 2; // buffer
if (_context.bandwidthLimiter().getSendBps()/1024d + (double)msgLoadBps >= _context.bandwidthLimiter().getOutboundKBytesPerSecond())
int curBps = getBps();
if ((curBps + msgLoadBps)/1024 >= _context.bandwidthLimiter().getOutboundKBytesPerSecond())
return true;
if (_context.bandwidthLimiter().getReceiveBps()/1024d + (double)msgLoadBps >= _context.bandwidthLimiter().getInboundKBytesPerSecond())
if ((curBps + msgLoadBps)/1024 >= _context.bandwidthLimiter().getInboundKBytesPerSecond())
return true;
if (_context.throttle().getMessageDelay() > 1000)
return true;
return false;
}
private int getBps() {
int used1s = RouterThrottleImpl.get1sRate(_context);
int used1m = RouterThrottleImpl.get1mRate(_context);
int used5m = RouterThrottleImpl.get5mRate(_context);
return Math.max(used1s, Math.max(used1m, used5m));
}
private class CreatedJob extends JobImpl {
private LoadTestTunnelConfig _cfg;
public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) {

View File

@ -211,27 +211,27 @@ class RouterThrottleImpl implements RouterThrottle {
return TUNNEL_ACCEPT;
}
private int get1sRate() {
return (int)Math.max(_context.bandwidthLimiter().getSendBps(), _context.bandwidthLimiter().getReceiveBps());
static int get1sRate(RouterContext ctx) {
return (int)Math.max(ctx.bandwidthLimiter().getSendBps(), ctx.bandwidthLimiter().getReceiveBps());
}
private int get1mRate() {
static int get1mRate(RouterContext ctx) {
int send = 0;
RateStat rs = _context.statManager().getRate("bw.sendRate");
RateStat rs = ctx.statManager().getRate("bw.sendRate");
if (rs != null)
send = (int)rs.getRate(1*60*1000).getAverageValue();
int recv = 0;
rs = _context.statManager().getRate("bw.recvRate");
rs = ctx.statManager().getRate("bw.recvRate");
if (rs != null)
recv = (int)rs.getRate(1*60*1000).getAverageValue();
return Math.max(send, recv);
}
private int get5mRate() {
static int get5mRate(RouterContext ctx) {
int send = 0;
RateStat rs = _context.statManager().getRate("bw.sendRate");
RateStat rs = ctx.statManager().getRate("bw.sendRate");
if (rs != null)
send = (int)rs.getRate(5*60*1000).getAverageValue();
int recv = 0;
rs = _context.statManager().getRate("bw.recvRate");
rs = ctx.statManager().getRate("bw.recvRate");
if (rs != null)
recv = (int)rs.getRate(5*60*1000).getAverageValue();
return Math.max(send, recv);
@ -247,9 +247,9 @@ class RouterThrottleImpl implements RouterThrottle {
*/
private boolean allowTunnel(double bytesAllocated, int numTunnels) {
int maxKBps = Math.min(_context.bandwidthLimiter().getOutboundKBytesPerSecond(), _context.bandwidthLimiter().getInboundKBytesPerSecond());
int used1s = get1sRate();
int used1m = get1mRate();
int used5m = get5mRate();
int used1s = 0; //get1sRate(_context); // dont throttle on the 1s rate, its too volatile
int used1m = get1mRate(_context);
int used5m = get5mRate(_context);
int used = Math.max(Math.max(used1s, used1m), used5m);
int availBps = (int)(((maxKBps*1024) - used) * getSharePercentage());

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.367 $ $Date: 2006/03/04 18:50:06 $";
public final static String ID = "$Revision: 1.368 $ $Date: 2006/03/05 02:45:00 $";
public final static String VERSION = "0.6.1.12";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -14,9 +14,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
public static final char CAPACITY_FLOODFILL = 'f';
private static final String PROP_FLOODFILL_PARTICIPANT = "router.floodfillParticipant";
private static final String DEFAULT_FLOODFILL_PARTICIPANT = "false";
private Map _activeFloodQueries;
public FloodfillNetworkDatabaseFacade(RouterContext context) {
super(context);
_activeFloodQueries = new HashMap();
}
protected void createHandlers() {
@ -106,4 +108,267 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
else
return false;
}
/**
* Begin a kademlia style search for the key specified, which can take up to timeoutMs and
* will fire the appropriate jobs on success or timeout (or if the kademlia search completes
* without any match)
*
*/
SearchJob search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
//if (true) return super.search(key, onFindJob, onFailedLookupJob, timeoutMs, isLease);
boolean isNew = true;
FloodSearchJob searchJob = null;
synchronized (_activeFloodQueries) {
searchJob = (FloodSearchJob)_activeFloodQueries.get(key);
if (searchJob == null) {
searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
_activeFloodQueries.put(key, searchJob);
isNew = true;
}
}
if (isNew) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("this is the first search for that key, fire off the FloodSearchJob");
_context.jobQueue().addJob(searchJob);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Deferring flood search for " + key.toBase64() + " with " + onFindJob);
searchJob.addDeferred(onFindJob, onFailedLookupJob, timeoutMs, isLease);
_context.statManager().addRateData("netDb.lookupLeaseSetDeferred", 1, searchJob.getExpiration()-_context.clock().now());
}
return null;
}
/**
* Ok, the initial set of searches to the floodfill peers timed out, lets fall back on the
* wider kademlia-style searches
*/
void searchFull(Hash key, List onFind, List onFailed, long timeoutMs, boolean isLease) {
synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
Job find = null;
if ( (onFind != null) && (onFind.size() > 0) )
find = (Job)onFind.remove(0);
Job fail = null;
if ( (onFailed != null) && (onFailed.size() > 0) )
fail = (Job)onFailed.remove(0);
SearchJob job = super.search(key, find, fail, timeoutMs, isLease);
if (job != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Floodfill search timed out for " + key.toBase64() + ", falling back on normal search (#"
+ job.getJobId() + ") with " + timeoutMs + " remaining");
long expiration = timeoutMs + _context.clock().now();
while ( (onFind != null) && (onFind.size() > 0) )
job.addDeferred((Job)onFind.remove(0), null, expiration, isLease);
while ( (onFailed != null) && (onFailed.size() > 0) )
job.addDeferred(null, (Job)onFailed.remove(0), expiration, isLease);
}
}
void complete(Hash key) {
synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
}
/** list of the Hashes of currently known floodfill peers */
List getFloodfillPeers() {
FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector();
return sel.selectFloodfillParticipants(getKBuckets());
}
}
/**
* Try sending a search to some floodfill peers, but if we don't get a successful
* match within half the allowed lookup time, give up and start querying through
* the normal (kademlia) channels. This should cut down on spurious lookups caused
* by simple delays in responses from floodfill peers
*
*/
class FloodSearchJob extends JobImpl {
private Log _log;
private FloodfillNetworkDatabaseFacade _facade;
private Hash _key;
private List _onFind;
private List _onFailed;
private long _expiration;
private int _timeoutMs;
private long _origExpiration;
private boolean _isLease;
private volatile int _lookupsRemaining;
private volatile boolean _dead;
public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
super(ctx);
_log = ctx.logManager().getLog(FloodSearchJob.class);
_facade = facade;
_key = key;
_onFind = new ArrayList();
_onFind.add(onFind);
_onFailed = new ArrayList();
_onFailed.add(onFailed);
int timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR;
if (timeout < timeoutMs)
timeout = timeoutMs;
_timeoutMs = timeout;
_expiration = timeout + ctx.clock().now();
_origExpiration = timeoutMs + ctx.clock().now();
_isLease = isLease;
_lookupsRemaining = 0;
_dead = false;
}
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
if (_dead) {
getContext().jobQueue().addJob(onFailed);
} else {
if (onFind != null) synchronized (_onFind) { _onFind.add(onFind); }
if (onFailed != null) synchronized (_onFailed) { _onFailed.add(onFailed); }
}
}
public long getExpiration() { return _expiration; }
private static final int CONCURRENT_SEARCHES = 2;
private static final int FLOOD_SEARCH_TIME_FACTOR = 2;
private static final int FLOOD_SEARCH_TIME_MIN = 30*1000;
public void runJob() {
// pick some floodfill peers and send out the searches
List floodfillPeers = _facade.getFloodfillPeers();
FloodLookupSelector replySelector = new FloodLookupSelector(getContext(), this);
ReplyJob onReply = new FloodLookupMatchJob(getContext(), this);
Job onTimeout = new FloodLookupTimeoutJob(getContext(), this);
OutNetMessage out = getContext().messageRegistry().registerPending(replySelector, onReply, onTimeout, _timeoutMs);
for (int i = 0; _lookupsRemaining < CONCURRENT_SEARCHES && i < floodfillPeers.size(); i++) {
Hash peer = (Hash)floodfillPeers.get(i);
if (peer.equals(getContext().routerHash()))
continue;
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if ( (replyTunnel == null) || (outTunnel == null) ) {
_dead = true;
while (_onFailed.size() > 0) {
Job job = (Job)_onFailed.remove(0);
getContext().jobQueue().addJob(job);
}
getContext().messageRegistry().unregisterPending(out);
return;
}
dlm.setFrom(replyTunnel.getPeer(0));
dlm.setMessageExpiration(getContext().clock().now()+10*1000);
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
dlm.setSearchKey(_key);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " to " + peer.toBase64());
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
_lookupsRemaining++;
}
if (_lookupsRemaining <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " had no peers to send to");
// no floodfill peers, go to the normal ones
getContext().messageRegistry().unregisterPending(out);
_facade.searchFull(_key, _onFind, _onFailed, _timeoutMs*FLOOD_SEARCH_TIME_FACTOR, _isLease);
}
}
public String getName() { return "NetDb search (phase 1)"; }
Hash getKey() { return _key; }
void decrementRemaining() { _lookupsRemaining--; }
int getLookupsRemaining() { return _lookupsRemaining; }
void failed() {
if (_dead) return;
_dead = true;
int timeRemaining = (int)(_origExpiration - getContext().clock().now());
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining);
if (timeRemaining > 0) {
_facade.searchFull(_key, _onFind, _onFailed, timeRemaining, _isLease);
} else {
for (int i = 0; i < _onFailed.size(); i++) {
Job j = (Job)_onFailed.remove(0);
getContext().jobQueue().addJob(j);
}
}
}
void success() {
if (_dead) return;
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
_dead = true;
_facade.complete(_key);
while (_onFind.size() > 0)
getContext().jobQueue().addJob((Job)_onFind.remove(0));
}
}
class FloodLookupTimeoutJob extends JobImpl {
private FloodSearchJob _search;
public FloodLookupTimeoutJob(RouterContext ctx, FloodSearchJob job) {
super(ctx);
_search = job;
}
public void runJob() {
_search.decrementRemaining();
if (_search.getLookupsRemaining() <= 0)
_search.failed();
}
public String getName() { return "NetDb search (phase 1) timeout"; }
}
class FloodLookupMatchJob extends JobImpl implements ReplyJob {
private Log _log;
private FloodSearchJob _search;
public FloodLookupMatchJob(RouterContext ctx, FloodSearchJob job) {
super(ctx);
_log = ctx.logManager().getLog(FloodLookupMatchJob.class);
_search = job;
}
public void runJob() {
if ( (getContext().netDb().lookupLeaseSetLocally(_search.getKey()) != null) ||
(getContext().netDb().lookupRouterInfoLocally(_search.getKey()) != null) ) {
_search.success();
} else {
int remaining = _search.getLookupsRemaining();
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + "/" + _search.getJobId() + ": got a reply looking for "
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
// netDb reply pointing us at other people
if (remaining <= 0)
_search.failed();
}
}
public String getName() { return "NetDb search (phase 1) match"; }
public void setMessage(I2NPMessage message) {}
}
class FloodLookupSelector implements MessageSelector {
private RouterContext _context;
private FloodSearchJob _search;
public FloodLookupSelector(RouterContext ctx, FloodSearchJob search) {
_context = ctx;
_search = search;
}
public boolean continueMatching() { return _search.getLookupsRemaining() > 0; }
public long getExpiration() { return _search.getExpiration(); }
public boolean isMatch(I2NPMessage message) {
if (message == null) return false;
if (message instanceof DatabaseStoreMessage) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
// is it worth making sure the reply came in on the right tunnel?
if (_search.getKey().equals(dsm.getKey())) {
_search.decrementRemaining();
return true;
}
} else if (message instanceof DatabaseSearchReplyMessage) {
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
if (_search.getKey().equals(dsrm.getSearchKey())) {
_search.decrementRemaining();
return true;
}
}
return false;
}
}

View File

@ -799,8 +799,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
* without any match)
*
*/
void search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
if (!_initialized) return;
SearchJob search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
if (!_initialized) return null;
boolean isNew = true;
SearchJob searchJob = null;
synchronized (_activeRequests) {
@ -823,6 +823,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
int deferred = searchJob.addDeferred(onFindJob, onFailedLookupJob, timeoutMs, isLease);
_context.statManager().addRateData("netDb.lookupLeaseSetDeferred", deferred, searchJob.getExpiration()-_context.clock().now());
}
return searchJob;
}
private Set getLeases() {
@ -851,8 +852,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
/** smallest allowed period */
private static final int MIN_PER_PEER_TIMEOUT = 3*1000;
private static final int MAX_PER_PEER_TIMEOUT = 5*1000;
private static final int MIN_PER_PEER_TIMEOUT = 5*1000;
private static final int MAX_PER_PEER_TIMEOUT = 10*1000;
public int getPeerTimeout(Hash peer) {
PeerProfile prof = _context.profileOrganizer().getProfile(peer);

View File

@ -51,6 +51,8 @@ class SearchJob extends JobImpl {
private List _deferredSearches;
private boolean _deferredCleared;
private long _startedOn;
private boolean _floodfillPeersExhausted;
private int _floodfillSearchesOutstanding;
private static final int SEARCH_BREDTH = 3; // 10 peers at a time
private static final int SEARCH_PRIORITY = 400; // large because the search is probably for a real search
@ -98,9 +100,12 @@ class SearchJob extends JobImpl {
_deferredCleared = false;
_peerSelector = facade.getPeerSelector();
_startedOn = -1;
_floodfillPeersExhausted = false;
_floodfillSearchesOutstanding = 0;
_expiration = getContext().clock().now() + timeoutMs;
getContext().statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
getContext().statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
@ -128,11 +133,32 @@ class SearchJob extends JobImpl {
public long getExpiration() { return _expiration; }
public long getTimeoutMs() { return _timeoutMs; }
private static final int PER_FLOODFILL_PEER_TIMEOUT = 30*1000;
protected int getPerPeerTimeoutMs(Hash peer) {
int timeout = 0;
if (_floodfillPeersExhausted && _floodfillSearchesOutstanding <= 0)
timeout = _facade.getPeerTimeout(peer);
else
timeout = PER_FLOODFILL_PEER_TIMEOUT;
long now = getContext().clock().now();
if (now + timeout > _expiration)
return (int)(_expiration - now);
else
return timeout;
}
/**
* Let each peer take up to the average successful search RTT
*
*/
protected int getPerPeerTimeoutMs() {
if (_floodfillPeersExhausted && _floodfillSearchesOutstanding <= 0)
return PER_PEER_TIMEOUT;
else
return PER_FLOODFILL_PEER_TIMEOUT;
/*
if (true)
return PER_PEER_TIMEOUT;
int rv = -1;
@ -145,8 +171,11 @@ class SearchJob extends JobImpl {
return PER_PEER_TIMEOUT;
else
return rv + 1025; // tunnel delay
*/
}
private static int MAX_PEERS_QUERIED = 20;
/**
* Send the next search, or stop if its completed
*/
@ -168,6 +197,11 @@ class SearchJob extends JobImpl {
_log.info(getJobId() + ": Key search expired");
_state.complete(true);
fail();
} else if (_state.getAttempted().size() > MAX_PEERS_QUERIED) {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Too many peers quried");
_state.complete(true);
fail();
} else {
//_log.debug("Continuing search");
continueSearch();
@ -243,7 +277,10 @@ class SearchJob extends JobImpl {
+ peer + " : " + (ds == null ? "null" : ds.getClass().getName()));
_state.replyTimeout(peer);
} else {
if (((RouterInfo)ds).isHidden() ||
RouterInfo ri = (RouterInfo)ds;
if (!FloodfillNetworkDatabaseFacade.isFloodfill(ri))
_floodfillPeersExhausted = true;
if (ri.isHidden() ||
getContext().shitlist().isShitlisted(peer)) {
// dont bother
} else {
@ -319,12 +356,13 @@ class SearchJob extends JobImpl {
} else {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64()
+ " for " + _state.getTarget().toBase64());
+ " for " + _state.getTarget().toBase64()
+ " w/ timeout " + getPerPeerTimeoutMs(router.getIdentity().calculateHash()));
}
getContext().statManager().addRateData("netDb.searchMessageCount", 1, 0);
if (_isLease || false) // moo
if (_isLease || true) // always send searches out tunnels
sendLeaseSearch(router);
else
sendRouterSearch(router);
@ -355,7 +393,7 @@ class SearchJob extends JobImpl {
// return;
//}
int timeout = _facade.getPeerTimeout(router.getIdentity().getHash());
int timeout = getPerPeerTimeoutMs(router.getIdentity().getHash());
long expiration = getContext().clock().now() + timeout;
DatabaseLookupMessage msg = buildMessage(inTunnelId, inTunnel.getPeer(0), expiration);
@ -379,6 +417,8 @@ class SearchJob extends JobImpl {
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade,
this, outTunnel, inTunnel);
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
_floodfillSearchesOutstanding++;
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash());
}
@ -398,6 +438,8 @@ class SearchJob extends JobImpl {
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY);
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
_floodfillSearchesOutstanding++;
j.runJob();
//getContext().jobQueue().addJob(j);
}
@ -475,6 +517,7 @@ class SearchJob extends JobImpl {
*/
protected class FailedJob extends JobImpl {
private Hash _peer;
private boolean _isFloodfill;
private boolean _penalizePeer;
private long _sentOn;
public FailedJob(RouterContext enclosingContext, RouterInfo peer) {
@ -490,8 +533,11 @@ class SearchJob extends JobImpl {
_penalizePeer = penalizePeer;
_peer = peer.getIdentity().getHash();
_sentOn = enclosingContext.clock().now();
_isFloodfill = FloodfillNetworkDatabaseFacade.isFloodfill(peer);
}
public void runJob() {
if (_isFloodfill)
_floodfillSearchesOutstanding--;
if (_state.completed()) return;
_state.replyTimeout(_peer);
if (_penalizePeer) {
@ -622,8 +668,11 @@ class SearchJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": State of failed search: " + _state);
long time = getContext().clock().now() - _state.getWhenStarted();
int attempted = _state.getAttempted().size();
getContext().statManager().addRateData("netDb.failedAttemptedPeers", attempted, time);
if (_keepStats) {
long time = getContext().clock().now() - _state.getWhenStarted();
getContext().statManager().addRateData("netDb.failedTime", time, 0);
//_facade.fail(_state.getTarget());
}
@ -711,6 +760,7 @@ class SearchJob extends JobImpl {
boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); }
long timeoutMs() { return _timeoutMs; }
boolean add(Hash peer) { return _facade.getKBuckets().add(peer); }
void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; }
}
class SearchReplyJob extends JobImpl {

View File

@ -26,6 +26,7 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
private SearchJob _job;
private TunnelInfo _outTunnel;
private TunnelInfo _replyTunnel;
private boolean _isFloodfillPeer;
private long _sentOn;
public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer,
@ -39,6 +40,7 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
super(context);
_log = context.logManager().getLog(SearchUpdateReplyFoundJob.class);
_peer = peer.getIdentity().getHash();
_isFloodfillPeer = FloodfillNetworkDatabaseFacade.isFloodfill(peer);
_state = state;
_facade = facade;
_job = job;
@ -49,6 +51,9 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
public String getName() { return "Update Reply Found for Kademlia Search"; }
public void runJob() {
if (_isFloodfillPeer)
_job.decrementOutstandingFloodfillSearches();
I2NPMessage message = _message;
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Reply from " + _peer.toBase64()

View File

@ -29,7 +29,7 @@ class StartExplorersJob extends JobImpl {
/** don't explore more than 1 bucket at a time */
private static final int MAX_PER_RUN = 1;
/** dont explore the network more often than once every minute */
private static final int MIN_RERUN_DELAY_MS = 60*1000;
private static final int MIN_RERUN_DELAY_MS = 5*60*1000;
/** explore the network at least once every thirty minutes */
private static final int MAX_RERUN_DELAY_MS = 30*60*1000;

View File

@ -92,6 +92,8 @@ class StoreJob extends JobImpl {
private boolean isExpired() {
return getContext().clock().now() >= _expiration;
}
private static final int MAX_PEERS_SENT = 10;
/**
* send the key to the next batch of peers
@ -105,6 +107,9 @@ class StoreJob extends JobImpl {
if (isExpired()) {
_state.complete(true);
fail();
} else if (_state.getAttempted().size() > MAX_PEERS_SENT) {
_state.complete(true);
fail();
} else {
//if (_log.shouldLog(Log.INFO))
// _log.info(getJobId() + ": Sending: " + _state);

View File

@ -317,7 +317,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (explicitSpecified)
return;
boolean fixedPort = getIsPortFixed();
boolean updated = false;
boolean fireTest = false;
@ -328,7 +328,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
+ RemoteHostId.toString(ourIP) + ". Lets throw tomatoes at them");
_context.shitlist().shitlistRouter(from, "They said we had an invalid IP");
return;
} else if (inboundRecent) {
} else if (inboundRecent && _externalListenPort > 0 && _externalListenHost != null) {
// use OS clock since its an ordering thing, not a time thing
if (_log.shouldLog(Log.INFO))
_log.info("Ignoring IP address suggestion, since we have received an inbound con recently");
@ -761,9 +761,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
} else {
boolean rv = (_externalListenHost == null) || (_externalListenPort <= 0);
if (!rv) {
RouterAddress addr = _externalAddress;
UDPAddress ua = new UDPAddress(addr);
if (ua.getIntroducerCount() > 0)
rv = true; // status == ok and we don't actually need introducers, so rebuild
}
if (_log.shouldLog(Log.INFO)) {
if (rv) {
_log.info("Need to initialize our direct SSU info");
_log.info("Need to initialize our direct SSU info (" + _externalListenHost + ":" + _externalListenPort + ")");
} else {
RouterAddress addr = _externalAddress;
UDPAddress ua = new UDPAddress(addr);

View File

@ -307,7 +307,11 @@ class BuildHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + _state.msg.getUniqueId() + " handled with a successful deferred lookup for the next peer " + _nextPeer.toBase64());
handleReq(getContext().netDb().lookupRouterInfoLocally(_nextPeer), _state, _req, _nextPeer);
RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(_nextPeer);
if (ri != null)
handleReq(ri, _state, _req, _nextPeer);
else
_log.error("Deferred successfully, but we couldnt find " + _nextPeer.toBase64() + "?");
}
}