* Transport clock skews:

- Store and report UDP clock skews even for large values, so
        a badly skewed local clock will be reported to the console
      - Don't shitlist for NTCP clock skew if we don't know what time it is
      - If NTP hasn't worked yet, have NTCP or SSU update the clock one time
      - Include failed clock skew in NTCP skew vector if there aren't many connections
      - Don't include NTCP clock skews for non-established connections
      - Fix framed clock skew frame size
      - Report framed clock skew even if for only one peer, if NTP hasn't worked yet
      - Don't log RRD errors after clock adjustment
      - Reduce min skew for console warning to 30s (was 45s)
      - More Java 5 cleanups
This commit is contained in:
zzz
2010-01-24 15:38:44 +00:00
parent 4f5cfdee57
commit 81dcbedd17
13 changed files with 131 additions and 61 deletions

View File

@ -99,7 +99,7 @@ public class SummaryHelper extends HelperBase {
//if (!_context.clock().getUpdatedSuccessfully())
Long skew = _context.commSystem().getFramedAveragePeerClockSkew(33);
// Display the actual skew, not the offset
if (skew != null && Math.abs(skew.longValue()) > 45)
if (skew != null && Math.abs(skew.longValue()) > 30)
return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew.longValue()) * 1000));
if (_context.router().isHidden())
return _("Hidden");

View File

@ -68,7 +68,10 @@ class SummaryListener implements RateSummaryListener {
} catch (IOException ioe) {
_log.error("Error adding", ioe);
} catch (RrdException re) {
_log.error("Error adding", re);
// this can happen after the time slews backwards, so don't make it an error
// org.jrobin.core.RrdException: Bad sample timestamp 1264343107. Last update time was 1264343172, at least one second step is required
if (_log.shouldLog(Log.WARN))
_log.warn("Error adding", re);
}
}
}

View File

@ -92,9 +92,10 @@ public class Clock implements Timestamper.UpdateListener {
else if (getLog().shouldLog(Log.INFO))
getLog().info("Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
if (!_statCreated)
if (!_statCreated) {
_context.statManager().createRateStat("clock.skew", "How far is the already adjusted clock being skewed?", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 });
_statCreated = true;
}
_context.statManager().addRateData("clock.skew", delta, 0);
} else {
getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms from " + _offset + "ms");

View File

@ -1,3 +1,19 @@
2010-01-28 zzz
* ProfileOrganizerRenderer: Cleanups
* Reseed: Update welt's reseed hostname
* Transport clock skews:
- Store and report UDP clock skews even for large values, so
a badly skewed local clock will be reported to the console
- Don't shitlist for NTCP clock skew if we don't know what time it is
- If NTP hasn't worked yet, have NTCP or SSU update the clock one time
- Include failed clock skew in NTCP skew vector if there aren't many connections
- Don't include NTCP clock skews for non-established connections
- Fix framed clock skew frame size
- Report framed clock skew even if for only one peer, if NTP hasn't worked yet
- Don't log RRD errors after clock adjustment
- Reduce min skew for console warning to 30s (was 45s)
- More Java 5 cleanups
2010-01-24 zzz
* Clock:
- Don't let a client update the router clock

View File

@ -88,9 +88,10 @@ public class RouterClock extends Clock {
else if (getLog().shouldLog(Log.INFO))
getLog().info("Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
if (!_statCreated)
if (!_statCreated) {
_contextRC.statManager().createRateStat("clock.skew", "How far is the already adjusted clock being skewed?", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 });
_statCreated = true;
}
_contextRC.statManager().addRateData("clock.skew", delta, 0);
} else {
getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms from " + _offset + "ms");

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1;
public final static long BUILD = 2;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -77,6 +77,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public boolean haveHighOutboundCapacity() { return (_manager == null ? false : _manager.haveHighOutboundCapacity()); }
/**
* @param percentToInclude 1-100
* @return Framed average clock skew of connected peers in seconds, or the clock offset if we cannot answer.
* Average is calculated over the middle "percentToInclude" peers.
*/
@ -87,21 +88,23 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
return Long.valueOf(_context.clock().getOffset() / 1000);
}
Vector skews = _manager.getClockSkews();
if (skews == null) {
return Long.valueOf(_context.clock().getOffset() / 1000);
}
if (skews.size() < 5) {
if (skews == null ||
skews.size() <= 0 ||
(skews.size() < 5 && _context.clock().getUpdatedSuccessfully())) {
return Long.valueOf(_context.clock().getOffset() / 1000);
}
// Going to calculate, sort them
Collections.sort(skews);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Clock skews: " + skews);
// Calculate frame size
int frameSize = Math.min((skews.size() * percentToInclude / 100), 2);
int frameSize = Math.max((skews.size() * percentToInclude / 100), 1);
int first = (skews.size() / 2) - (frameSize / 2);
int last = (skews.size() / 2) + (frameSize / 2);
int last = Math.min((skews.size() / 2) + (frameSize / 2), skews.size() - 1);
// Sum skew values
long sum = 0;
for (int i = first; i < last; i++) {
for (int i = first; i <= last; i++) {
long value = ((Long) (skews.get(i))).longValue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding clock skew " + i + " valued " + value + " s.");

View File

@ -365,12 +365,20 @@ public class EstablishState {
// the skew is not authenticated yet, but it is certainly fatal to
// the establishment, so fail hard if appropriate
long diff = 1000*Math.abs(_tsA-_tsB);
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation
_context.clock().setOffset(1000 * (_tsB - _tsA), true);
_tsA = _tsB;
if (diff != 0)
_log.error("NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff, 0);
_transport.markReachable(_con.getRemotePeer().calculateHash(), false);
// Only shitlist if we know what time it is
_context.shitlist().shitlistRouter(DataHelper.formatDuration(diff),
_con.getRemotePeer().calculateHash(),
_x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(_tsA- _tsB);
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {
@ -570,12 +578,21 @@ public class EstablishState {
_log.debug(prefix() + "verification successful for " + _con);
long diff = 1000*Math.abs(tsA-_tsB);
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation
// This isn't very likely, outbound will do it first
_context.clock().setOffset(1000 * (_tsB - tsA), true);
tsA = _tsB;
if (diff != 0)
_log.error("NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
_transport.markReachable(alice.calculateHash(), true);
// Only shitlist if we know what time it is
_context.shitlist().shitlistRouter(DataHelper.formatDuration(diff),
alice.calculateHash(),
_x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(tsA- _tsB);
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {

View File

@ -53,6 +53,7 @@ public class NTCPTransport extends TransportImpl {
private List _sent;
private NTCPSendFinisher _finisher;
private long _lastBadSkew;
private static final long[] RATES = { 10*60*1000 };
public NTCPTransport(RouterContext ctx) {
@ -382,24 +383,36 @@ public class NTCPTransport extends TransportImpl {
return active;
}
/** @param skew in seconds */
void setLastBadSkew(long skew) {
_lastBadSkew = skew;
}
/**
* Return our peer clock skews on this transport.
* Vector composed of Long, each element representing a peer skew in seconds.
*/
@Override
public Vector getClockSkews() {
public Vector<Long> getClockSkews() {
Vector peers = new Vector();
Vector skews = new Vector();
Vector<NTCPConnection> peers = new Vector();
Vector<Long> skews = new Vector();
synchronized (_conLock) {
peers.addAll(_conByIdent.values());
}
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
skews.addElement(new Long (con.getClockSkew()));
for (Iterator<NTCPConnection> iter = peers.iterator(); iter.hasNext(); ) {
NTCPConnection con = iter.next();
if (con.isEstablished())
skews.addElement(Long.valueOf(con.getClockSkew()));
}
// If we don't have many peers, maybe it is because of a bad clock, so
// return the last bad skew we got
if (skews.size() < 5 && _lastBadSkew != 0)
skews.addElement(Long.valueOf(_lastBadSkew));
if (_log.shouldLog(Log.DEBUG))
_log.debug("NTCP transport returning " + skews.size() + " peer clock skews.");
return skews;

View File

@ -6,6 +6,7 @@ import java.util.List;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.data.DataHelper;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -417,26 +418,36 @@ public class PacketHandler {
long recvOn = packet.getBegin();
long sendOn = reader.readTimestamp() * 1000;
long skew = recvOn - sendOn;
// update skew whether or not we will be dropping the packet for excessive skew
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew);
state.adjustClockSkew(skew);
}
_context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime());
if (!_context.clock().getUpdatedSuccessfully()) {
// adjust the clock one time in desperation
// this doesn't seem to work for big skews, we never get anything back,
// so we have to wait for NTCP to do it
_context.clock().setOffset(0 - skew, true);
if (skew != 0)
_log.error("NTP failure, UDP adjusting clock by " + DataHelper.formatDuration(Math.abs(skew)));
}
if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet);
_log.warn("Packet too far in the past: " + new Date(sendOn) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration());
return;
} else if (skew < 0 - GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet);
_log.warn("Packet too far in the future: " + new Date(sendOn) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration());
return;
}
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew);
state.adjustClockSkew((short)skew);
}
_context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime());
//InetAddress fromHost = packet.getPacket().getAddress();
//int fromPort = packet.getPacket().getPort();
//RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);

View File

@ -59,8 +59,8 @@ public class PeerState {
private boolean _rekeyBeganLocally;
/** when were the current cipher and MAC keys established/rekeyed? */
private long _keyEstablishedTime;
/** how far off is the remote peer from our clock, in seconds? */
private short _clockSkew;
/** how far off is the remote peer from our clock, in milliseconds? */
private long _clockSkew;
/** what is the current receive second, for congestion control? */
private long _currentReceiveSecond;
/** when did we last send them a packet? */
@ -327,8 +327,8 @@ public class PeerState {
public boolean getRekeyBeganLocally() { return _rekeyBeganLocally; }
/** when were the current cipher and MAC keys established/rekeyed? */
public long getKeyEstablishedTime() { return _keyEstablishedTime; }
/** how far off is the remote peer from our clock, in seconds? */
public short getClockSkew() { return ( (short) (_clockSkew / 1000)); }
/** how far off is the remote peer from our clock, in milliseconds? */
public long getClockSkew() { return _clockSkew ; }
/** what is the current receive second, for congestion control? */
public long getCurrentReceiveSecond() { return _currentReceiveSecond; }
/** when did we last send them a packet? */
@ -425,9 +425,9 @@ public class PeerState {
public void setRekeyBeganLocally(boolean local) { _rekeyBeganLocally = local; }
/** when were the current cipher and MAC keys established/rekeyed? */
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/** how far off is the remote peer from our clock, in seconds? */
public void adjustClockSkew(short skew) {
_clockSkew = (short)(0.9*(float)_clockSkew + 0.1*(float)skew);
/** how far off is the remote peer from our clock, in milliseconds? */
public void adjustClockSkew(long skew) {
_clockSkew = (long) (0.9*(float)_clockSkew + 0.1*(float)skew);
}
/** what is the current receive second, for congestion control? */
public void setCurrentReceiveSecond(long sec) { _currentReceiveSecond = sec; }

View File

@ -74,6 +74,7 @@ public class UDPPacketReader {
return (_message[_payloadBeginOffset] & (1 << 2)) != 0;
}
/** @return seconds */
public long readTimestamp() {
return DataHelper.fromLong(_message, _payloadBeginOffset + 1, 4);
}

View File

@ -42,9 +42,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private Log _log;
private UDPEndpoint _endpoint;
/** Peer (Hash) to PeerState */
private final Map _peersByIdent;
private final Map<Hash, PeerState> _peersByIdent;
/** RemoteHostId to PeerState */
private final Map _peersByRemoteHost;
private final Map<RemoteHostId, PeerState> _peersByRemoteHost;
private PacketHandler _handler;
private EstablishmentManager _establisher;
private MessageQueue _outboundMessages;
@ -575,7 +575,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
PeerState getPeerState(RemoteHostId hostInfo) {
synchronized (_peersByRemoteHost) {
return (PeerState)_peersByRemoteHost.get(hostInfo);
return _peersByRemoteHost.get(hostInfo);
}
}
@ -585,7 +585,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
public PeerState getPeerState(Hash remotePeer) {
synchronized (_peersByIdent) {
return (PeerState)_peersByIdent.get(remotePeer);
return _peersByIdent.get(remotePeer);
}
}
@ -664,7 +664,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
PeerState oldPeer = null;
if (remotePeer != null) {
synchronized (_peersByIdent) {
oldPeer = (PeerState)_peersByIdent.put(remotePeer, peer);
oldPeer = _peersByIdent.put(remotePeer, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
@ -684,7 +684,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (remoteId == null) return false;
synchronized (_peersByRemoteHost) {
oldPeer = (PeerState)_peersByRemoteHost.put(remoteId, peer);
oldPeer = _peersByRemoteHost.put(remoteId, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
@ -883,14 +883,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long now = _context.clock().now();
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
synchronized (_peersByIdent) {
altByIdent = (PeerState)_peersByIdent.remove(peer.getRemotePeer());
altByIdent = _peersByIdent.remove(peer.getRemotePeer());
}
}
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId != null) {
synchronized (_peersByRemoteHost) {
altByHost = (PeerState)_peersByRemoteHost.remove(remoteId);
altByHost = _peersByRemoteHost.remove(remoteId);
}
}
@ -1417,8 +1417,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
int active = 0;
int inactive = 0;
synchronized (_peersByIdent) {
for (Iterator iter = _peersByIdent.values().iterator(); iter.hasNext(); ) {
PeerState peer = (PeerState)iter.next();
for (Iterator<PeerState> iter = _peersByIdent.values().iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
if (now-peer.getLastReceiveTime() > 5*60*1000)
inactive++;
else
@ -1434,8 +1434,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
int active = 0;
int inactive = 0;
synchronized (_peersByIdent) {
for (Iterator iter = _peersByIdent.values().iterator(); iter.hasNext(); ) {
PeerState peer = (PeerState)iter.next();
for (Iterator<PeerState> iter = _peersByIdent.values().iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
if (now-peer.getLastSendFullyTime() > 1*60*1000)
inactive++;
else
@ -1461,20 +1461,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* Vector composed of Long, each element representing a peer skew in seconds.
*/
@Override
public Vector getClockSkews() {
public Vector<Long> getClockSkews() {
Vector skews = new Vector();
Vector peers = new Vector();
Vector<Long> skews = new Vector();
Vector<PeerState> peers = new Vector();
synchronized (_peersByIdent) {
peers.addAll(_peersByIdent.values());
}
// If our clock is way off, we may not have many (or any) successful connections,
// so try hard in that case to return good data
boolean includeEverybody = _context.router().getUptime() < 10*60*1000 || peers.size() < 10;
long now = _context.clock().now();
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
PeerState peer = (PeerState)iter.next();
if (now-peer.getLastReceiveTime() > 60*60*1000) continue; // skip old peers
skews.addElement(new Long (peer.getClockSkew()));
for (Iterator<PeerState> iter = peers.iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
if ((!includeEverybody) && now - peer.getLastReceiveTime() > 15*60*1000)
continue; // skip old peers
skews.addElement(Long.valueOf(peer.getClockSkew() / 1000));
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("UDP transport returning " + skews.size() + " peer clock skews.");
@ -1813,7 +1817,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void renderStatusHTML(Writer out, int sortFlags) throws IOException {}
@Override
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException {
TreeSet peers = new TreeSet(getComparator(sortFlags));
TreeSet<PeerState> peers = new TreeSet(getComparator(sortFlags));
synchronized (_peersByIdent) {
peers.addAll(_peersByIdent.values());
}
@ -1958,7 +1962,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</td>");
buf.append(" <td class=\"cells\" align=\"center\" >");
buf.append(peer.getClockSkew());
buf.append(peer.getClockSkew() / 1000);
buf.append("s</td>");
offsetTotal = offsetTotal + peer.getClockSkew();
@ -2055,7 +2059,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(formatKBps(bpsIn)).append("/").append(formatKBps(bpsOut));
buf.append("K/s</b></td>");
buf.append(" <td align=\"center\"><b>").append(numPeers > 0 ? DataHelper.formatDuration(uptimeMsTotal/numPeers) : "0s");
buf.append("</b></td> <td align=\"center\"><b>").append(numPeers > 0 ? DataHelper.formatDuration(offsetTotal*1000/numPeers) : "0ms").append("</b></td>\n");
buf.append("</b></td> <td align=\"center\"><b>").append(numPeers > 0 ? DataHelper.formatDuration(offsetTotal/numPeers) : "0ms").append("</b></td>\n");
buf.append(" <td align=\"center\"><b>");
buf.append(numPeers > 0 ? cwinTotal/(numPeers*1024) + "K" : "0K");
buf.append("</b></td> <td>&nbsp;</td>\n");
@ -2276,13 +2280,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
PeerState pickTestPeer(RemoteHostId dontInclude) {
List peers = null;
List<PeerState> peers = null;
synchronized (_peersByIdent) {
peers = new ArrayList(_peersByIdent.values());
}
Collections.shuffle(peers, _context.random());
for (int i = 0; i < peers.size(); i++) {
PeerState peer = (PeerState)peers.get(i);
PeerState peer = peers.get(i);
if ( (dontInclude != null) && (dontInclude.equals(peer.getRemoteHostId())) )
continue;
RouterInfo peerInfo = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());