- getFramedAveragePeerClockSkew() now returns a long (ms);
        was a Long (s)
      - Implement NTP-style clock slewing so the clock is adjusted
        gradually
      - Implement clock strata so we prefer better clocks
      - Implement a timestamper in the transport so we will periodically
        update the clock even if NTP is not working
        This allows the router to converge the clock instead of simply
        hoping the first connected peer is correct.
      - Slow down NTP attempts after several consecutive failures
This commit is contained in:
zzz
2010-02-13 01:20:23 +00:00
parent 375118fe02
commit 4f70a7d0fe
7 changed files with 240 additions and 45 deletions

View File

@ -16,6 +16,7 @@ import net.i2p.data.LeaseSet;
import net.i2p.data.RouterAddress;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.Router;
import net.i2p.router.RouterClock;
import net.i2p.router.RouterVersion;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
@ -88,6 +89,10 @@ public class SummaryHelper extends HelperBase {
public String getReachability() {
return reachability(); // + timeSkew();
// testing
//return reachability() +
// " Offset: " + DataHelper.formatDuration(_context.clock().getOffset()) +
// " Slew: " + DataHelper.formatDuration(((RouterClock)_context.clock()).getDeltaOffset());
}
private String reachability() {
@ -97,10 +102,10 @@ public class SummaryHelper extends HelperBase {
// Warn based on actual skew from peers, not update status, so if we successfully offset
// the clock, we don't complain.
//if (!_context.clock().getUpdatedSuccessfully())
Long skew = _context.commSystem().getFramedAveragePeerClockSkew(33);
long skew = _context.commSystem().getFramedAveragePeerClockSkew(33);
// Display the actual skew, not the offset
if (skew != null && Math.abs(skew.longValue()) > 30)
return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew.longValue()) * 1000));
if (Math.abs(skew) > 30*1000)
return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew)));
if (_context.router().isHidden())
return _("Hidden");

View File

@ -76,12 +76,46 @@ public class NtpClient {
throw new IllegalArgumentException("No reachable NTP servers specified");
}
/**
* Query the ntp servers, returning the current time from first one we find
* Hack to return time and stratum
* @return time in rv[0] and stratum in rv[1]
* @throws IllegalArgumentException if none of the servers are reachable
* @since 0.7.12
*/
public static long[] currentTimeAndStratum(String serverNames[]) {
if (serverNames == null)
throw new IllegalArgumentException("No NTP servers specified");
ArrayList names = new ArrayList(serverNames.length);
for (int i = 0; i < serverNames.length; i++)
names.add(serverNames[i]);
Collections.shuffle(names);
for (int i = 0; i < names.size(); i++) {
long[] rv = currentTimeAndStratum((String)names.get(i));
if (rv != null && rv[0] > 0)
return rv;
}
throw new IllegalArgumentException("No reachable NTP servers specified");
}
/**
* Query the given NTP server, returning the current internet time
*
* @return milliseconds since january 1, 1970 (UTC), or -1 on error
*/
public static long currentTime(String serverName) {
long[] la = currentTimeAndStratum(serverName);
if (la != null)
return la[0];
return -1;
}
/**
* Hack to return time and stratum
* @return time in rv[0] and stratum in rv[1], or null for error
* @since 0.7.12
*/
private static long[] currentTimeAndStratum(String serverName) {
try {
// Send request
DatagramSocket socket = new DatagramSocket();
@ -104,7 +138,7 @@ public class NtpClient {
socket.receive(packet);
} catch (InterruptedIOException iie) {
socket.close();
return -1;
return null;
}
// Immediately record the incoming timestamp
@ -123,15 +157,17 @@ public class NtpClient {
// Anything else is right out, treat such responses like errors
if ((msg.stratum < 1) || (msg.stratum > 15)) {
//System.out.println("Response from NTP server of unacceptable stratum " + msg.stratum + ", failing.");
return(-1);
return null;
}
long rv = (long)(System.currentTimeMillis() + localClockOffset*1000);
long[] rv = new long[2];
rv[0] = (long)(System.currentTimeMillis() + localClockOffset*1000);
rv[1] = msg.stratum;
//System.out.println("host: " + address.getHostAddress() + " rtt: " + roundTripDelay + " offset: " + localClockOffset + " seconds");
return rv;
} catch (IOException ioe) {
//ioe.printStackTrace();
return -1;
return null;
}
}

View File

@ -23,6 +23,7 @@ public class Timestamper implements Runnable {
private final List<UpdateListener> _listeners;
private int _queryFrequency;
private int _concurringServers;
private int _consecutiveFails;
private volatile boolean _disabled;
private boolean _daemon;
private boolean _initialized;
@ -34,6 +35,7 @@ public class Timestamper implements Runnable {
private static final String DEFAULT_DISABLED = "true";
/** how many times do we have to query if we are changing the clock? */
private static final int DEFAULT_CONCURRING_SERVERS = 3;
private static final int MAX_CONSECUTIVE_FAILS = 10;
public static final String PROP_QUERY_FREQUENCY = "time.queryFrequencyMs";
public static final String PROP_SERVER_LIST = "time.sntpServerList";
@ -106,7 +108,7 @@ public class Timestamper implements Runnable {
}
public UpdateListener getListener(int index) {
synchronized (_listeners) {
return (UpdateListener)_listeners.get(index);
return _listeners.get(index);
}
}
@ -171,8 +173,12 @@ public class Timestamper implements Runnable {
synchronized (this) { notifyAll(); }
long sleepTime;
if (lastFailed) {
if (++_consecutiveFails >= MAX_CONSECUTIVE_FAILS)
sleepTime = 30*60*1000;
else
sleepTime = 30*1000;
} else {
_consecutiveFails = 0;
sleepTime = _context.random().nextInt(_queryFrequency) + _queryFrequency;
if (_wellSynced)
sleepTime *= 3;
@ -191,6 +197,7 @@ public class Timestamper implements Runnable {
private boolean queryTime(String serverList[]) throws IllegalArgumentException {
long found[] = new long[_concurringServers];
long now = -1;
int stratum = -1;
long expectedDelta = 0;
_wellSynced = false;
for (int i = 0; i < _concurringServers; i++) {
@ -198,7 +205,9 @@ public class Timestamper implements Runnable {
// this delays startup when net is disconnected or the timeserver list is bad, don't make it too long
try { Thread.sleep(2*1000); } catch (InterruptedException ie) {}
}
now = NtpClient.currentTime(serverList);
long[] timeAndStratum = NtpClient.currentTimeAndStratum(serverList);
now = timeAndStratum[0];
stratum = (int) timeAndStratum[1];
long delta = now - _context.clock().now();
found[i] = delta;
if (i == 0) {
@ -230,7 +239,7 @@ public class Timestamper implements Runnable {
}
}
}
stampTime(now);
stampTime(now, stratum);
if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(64);
buf.append("Deltas: ");
@ -242,14 +251,14 @@ public class Timestamper implements Runnable {
}
/**
* Send an HTTP request to a given URL specifying the current time
* Notify the listeners
*/
private void stampTime(long now) {
private void stampTime(long now, int stratum) {
long before = _context.clock().now();
synchronized (_listeners) {
for (int i = 0; i < _listeners.size(); i++) {
UpdateListener lsnr = (UpdateListener)_listeners.get(i);
lsnr.setNow(now);
UpdateListener lsnr = _listeners.get(i);
lsnr.setNow(now, stratum);
}
}
if (_log.shouldLog(Log.DEBUG))
@ -311,13 +320,16 @@ public class Timestamper implements Runnable {
/**
* Interface to receive update notifications for when we query the time
*
* Only used by Clock.
* stratum parameter added in 0.7.12.
* If there were any users outside of the tree, this broke compatibility, sorry.
*/
public interface UpdateListener {
/**
* The time has been queried and we have a current value for 'now'
*
*/
public void setNow(long now);
/** @param stratum 1-15, 1 being the best (added in 0.7.12) */
public void setNow(long now, int stratum);
}
}

View File

@ -117,6 +117,15 @@ public class Clock implements Timestamper.UpdateListener {
setOffset(diff);
}
/**
* @param stratum ignored
* @since 0.7.12
*/
public void setNow(long realTime, int stratum) {
long diff = realTime - System.currentTimeMillis();
setOffset(diff);
}
/**
* Retrieve the current time synchronized with whatever reference clock is in
* use.

View File

@ -49,7 +49,7 @@ public abstract class CommSystemFacade implements Service {
* Return framed average clock skew of connected peers in seconds, or null if we cannot answer.
* CommSystemFacadeImpl overrides this.
*/
public Long getFramedAveragePeerClockSkew(int percentToInclude) { return null; }
public long getFramedAveragePeerClockSkew(int percentToInclude) { return 0; }
/**
* Determine under what conditions we are remotely reachable.

View File

@ -15,11 +15,31 @@ import net.i2p.util.Log;
*/
public class RouterClock extends Clock {
/**
* How often we will slew the clock
* i.e. ppm = 1000000/MAX_SLEW
* We should be able to slew really fast,
* this is probably a lot faster than what NTP does
* 1/50 is 12s in a 10m tunnel lifetime, that should be fine.
* All of this is @since 0.7.12
*/
private static final long MAX_SLEW = 50;
private static final int DEFAULT_STRATUM = 8;
private static final int WORST_STRATUM = 16;
/** the max NTP Timestamper delay is 30m right now, make this longer than that */
private static final long MIN_DELAY_FOR_WORSE_STRATUM = 45*60*1000;
private volatile long _desiredOffset;
private volatile long _lastSlewed;
/** use system time for this */
private long _lastChanged;
private int _lastStratum;
RouterContext _contextRC; // LINT field hides another field
public RouterClock(RouterContext context) {
super(context);
_contextRC = context;
_lastStratum = WORST_STRATUM;
}
/**
@ -29,6 +49,16 @@ public class RouterClock extends Clock {
*/
@Override
public void setOffset(long offsetMs, boolean force) {
setOffset(offsetMs, force, DEFAULT_STRATUM);
}
/** @since 0.7.12 */
private void setOffset(long offsetMs, int stratum) {
setOffset(offsetMs, false, stratum);
}
/** @since 0.7.12 */
private void setOffset(long offsetMs, boolean force, int stratum) {
long delta = offsetMs - _offset;
if (!force) {
if ((offsetMs > MAX_OFFSET) || (offsetMs < 0 - MAX_OFFSET)) {
@ -45,59 +75,127 @@ public class RouterClock extends Clock {
}
}
if ((delta < MIN_OFFSET_CHANGE) && (delta > 0 - MIN_OFFSET_CHANGE)) {
getLog().debug("Not changing offset since it is only " + delta + "ms");
// let's be perfect
if (delta == 0) {
getLog().debug("Not changing offset, delta=0");
_alreadyChanged = true;
return;
}
// only listen to a worse stratum if it's been a while
if (_alreadyChanged && stratum > _lastStratum &&
System.currentTimeMillis() - _lastChanged < MIN_DELAY_FOR_WORSE_STRATUM) {
getLog().warn("Ignoring update from a stratum " + stratum +
" clock, we recently had an update from a stratum " + _lastStratum + " clock");
return;
}
// If so configured, check sanity of proposed clock offset
if (Boolean.valueOf(_contextRC.getProperty("router.clockOffsetSanityCheck","true")).booleanValue() &&
_alreadyChanged) {
// Try calculating peer clock skew
Long peerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50);
if (peerClockSkew != null) {
long currentPeerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50);
// Predict the effect of applying the proposed clock offset
long currentPeerClockSkew = peerClockSkew.longValue();
long predictedPeerClockSkew = currentPeerClockSkew + (delta / 1000l);
long predictedPeerClockSkew = currentPeerClockSkew + delta;
// Fail sanity check if applying the offset would increase peer clock skew
if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5)) ||
(Math.abs(predictedPeerClockSkew) > 20)) {
if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5*1000)) ||
(Math.abs(predictedPeerClockSkew) > 20*1000)) {
getLog().error("Ignoring clock offset " + offsetMs + "ms (current " + _offset +
"ms) since it would increase peer clock skew from " + currentPeerClockSkew +
"s to " + predictedPeerClockSkew + "s. Broken server in pool.ntp.org?");
"ms to " + predictedPeerClockSkew + "ms. Bad time server?");
return;
} else {
getLog().debug("Approving clock offset " + offsetMs + "ms (current " + _offset +
"ms) since it would decrease peer clock skew from " + currentPeerClockSkew +
"s to " + predictedPeerClockSkew + "s.");
}
"ms to " + predictedPeerClockSkew + "ms.");
}
} // check sanity
}
if (_alreadyChanged) {
// Update the target offset, slewing will take care of the rest
if (delta > 15*1000)
getLog().error("Warning - Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
getLog().error("Warning - Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum);
else if (getLog().shouldLog(Log.INFO))
getLog().info("Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
getLog().info("Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum);
if (!_statCreated) {
_contextRC.statManager().createRateStat("clock.skew", "How far is the already adjusted clock being skewed?", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 });
_statCreated = true;
}
_contextRC.statManager().addRateData("clock.skew", delta, 0);
_desiredOffset = offsetMs;
} else {
getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms from " + _offset + "ms");
}
getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum);
_alreadyChanged = true;
_offset = offsetMs;
_desiredOffset = offsetMs;
// this is used by the JobQueue
fireOffsetChanged(delta);
}
_lastChanged = System.currentTimeMillis();
_lastStratum = stratum;
}
/**
* @param stratum used to determine whether we should ignore
* @since 0.7.12
*/
@Override
public void setNow(long realTime, int stratum) {
long diff = realTime - System.currentTimeMillis();
setOffset(diff, stratum);
}
/**
* Retrieve the current time synchronized with whatever reference clock is in use.
* Do really simple clock slewing, like NTP but without jitter prevention.
* Slew the clock toward the desired offset, but only up to a maximum slew rate,
* and never let the clock go backwards because of slewing.
*
* Take care to only access the volatile variables once for speed and to
* avoid having another thread change them
*
* This is called about a zillion times a second, so we can do the slewing right
* here rather than in some separate thread to keep it simple.
* Avoiding backwards clocks when updating in a thread would be hard too.
*/
@Override
public long now() {
long systemNow = System.currentTimeMillis();
// copy the global, so two threads don't both increment or decrement _offset
long offset = _offset;
if (systemNow >= _lastSlewed + MAX_SLEW) {
// copy the global
long desiredOffset = _desiredOffset;
if (desiredOffset > offset) {
// slew forward
_offset = ++offset;
_lastSlewed = systemNow;
} else if (desiredOffset < offset) {
// slew backward, but don't let the clock go backward
// this should be the first call since systemNow
// was greater than lastSled + MAX_SLEW, i.e. different
// from the last systemNow, thus we won't let the clock go backward,
// no need to track when we were last called.
_offset = --offset;
_lastSlewed = systemNow;
}
}
return offset + systemNow;
}
/*
* How far we still have to slew, for diagnostics
* @since 0.7.12
*/
public long getDeltaOffset() {
return _desiredOffset - _offset;
}
}

View File

@ -51,6 +51,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
_log.info("Starting up the comm system");
_manager = new TransportManager(_context);
_manager.startListening();
startTimestamper();
}
public void shutdown() {
@ -78,20 +79,20 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
/**
* @param percentToInclude 1-100
* @return Framed average clock skew of connected peers in seconds, or the clock offset if we cannot answer.
* @return Framed average clock skew of connected peers in milliseconds, or the clock offset if we cannot answer.
* Average is calculated over the middle "percentToInclude" peers.
* Todo: change Vectors to milliseconds
*/
@Override
public Long getFramedAveragePeerClockSkew(int percentToInclude) {
public long getFramedAveragePeerClockSkew(int percentToInclude) {
if (_manager == null) {
// round toward zero
return Long.valueOf(_context.clock().getOffset() / 1000);
return _context.clock().getOffset();
}
Vector skews = _manager.getClockSkews();
if (skews == null ||
skews.size() <= 0 ||
(skews.size() < 5 && _context.clock().getUpdatedSuccessfully())) {
return Long.valueOf(_context.clock().getOffset() / 1000);
return _context.clock().getOffset();
}
// Going to calculate, sort them
@ -106,12 +107,12 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
long sum = 0;
for (int i = first; i <= last; i++) {
long value = ((Long) (skews.get(i))).longValue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding clock skew " + i + " valued " + value + " s.");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Adding clock skew " + i + " valued " + value + " s.");
sum = sum + value;
}
// Calculate average (round toward zero)
return Long.valueOf(sum / frameSize);
// Calculate average
return sum * 1000 / frameSize;
}
public List getBids(OutNetMessage msg) {
@ -481,4 +482,38 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
buf.append("</tt>");
return buf.toString();
}
/*
* Timestamper stuff
*
* This is used as a backup to NTP over UDP.
* @since 0.7.12
*/
private static final int TIME_START_DELAY = 5*60*1000;
private static final int TIME_REPEAT_DELAY = 10*60*1000;
/** @since 0.7.12 */
private void startTimestamper() {
SimpleScheduler.getInstance().addPeriodicEvent(new Timestamper(), TIME_START_DELAY, TIME_REPEAT_DELAY);
}
/**
* Update the clock offset based on the average of the peers.
* This uses the default stratum which is lower than any reasonable
* NTP source, so it will be ignored unless NTP is broken.
* @since 0.7.12
*/
private class Timestamper implements SimpleTimer.TimedEvent {
public void timeReached() {
// use the same % as in RouterClock so that check will never fail
// This is their our offset w.r.t. them...
long peerOffset = getFramedAveragePeerClockSkew(50);
if (peerOffset == 0)
return;
long currentOffset = _context.clock().getOffset();
// ... so we subtract it to get in sync with them
long newOffset = currentOffset - peerOffset;
_context.clock().setOffset(newOffset);
}
}
}