the actual fix for the local eepsite problem (if getRemoteID was called *after* the remoteID was set, it would wait for 60s then fail. now we check for that)
synchronization cleanup (never get two locks) logging
This commit is contained in:
@ -5,12 +5,14 @@ import java.io.InputStream;
|
|||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import net.i2p.I2PAppContext;
|
||||||
import net.i2p.I2PException;
|
import net.i2p.I2PException;
|
||||||
import net.i2p.client.I2PSessionException;
|
import net.i2p.client.I2PSessionException;
|
||||||
import net.i2p.data.Destination;
|
import net.i2p.data.Destination;
|
||||||
import net.i2p.util.I2PThread;
|
import net.i2p.util.I2PThread;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initial stub implementation for the socket
|
* Initial stub implementation for the socket
|
||||||
*
|
*
|
||||||
@ -35,6 +37,9 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
private static long __socketId = 0;
|
private static long __socketId = 0;
|
||||||
private long _bytesRead = 0;
|
private long _bytesRead = 0;
|
||||||
private long _bytesWritten = 0;
|
private long _bytesWritten = 0;
|
||||||
|
private long _createdOn;
|
||||||
|
private long _closedOn;
|
||||||
|
private long _remoteIdSetTime;
|
||||||
private Object flagLock = new Object();
|
private Object flagLock = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -73,6 +78,9 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
out = new I2POutputStream(pin);
|
out = new I2POutputStream(pin);
|
||||||
new I2PSocketRunner(pin);
|
new I2PSocketRunner(pin);
|
||||||
this.localID = localID;
|
this.localID = localID;
|
||||||
|
_createdOn = I2PAppContext.getGlobalContext().clock().now();
|
||||||
|
_remoteIdSetTime = -1;
|
||||||
|
_closedOn = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -89,6 +97,7 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
public void setRemoteID(String id) {
|
public void setRemoteID(String id) {
|
||||||
synchronized (remoteIDWaiter) {
|
synchronized (remoteIDWaiter) {
|
||||||
remoteID = id;
|
remoteID = id;
|
||||||
|
_remoteIdSetTime = System.currentTimeMillis();
|
||||||
remoteIDWaiter.notifyAll();
|
remoteIDWaiter.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -123,6 +132,7 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
long dieAfter = System.currentTimeMillis() + maxWait;
|
long dieAfter = System.currentTimeMillis() + maxWait;
|
||||||
synchronized (remoteIDWaiter) {
|
synchronized (remoteIDWaiter) {
|
||||||
if (wait) {
|
if (wait) {
|
||||||
|
if (remoteID == null) {
|
||||||
try {
|
try {
|
||||||
if (maxWait >= 0)
|
if (maxWait >= 0)
|
||||||
remoteIDWaiter.wait(maxWait);
|
remoteIDWaiter.wait(maxWait);
|
||||||
@ -130,11 +140,14 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
remoteIDWaiter.wait();
|
remoteIDWaiter.wait();
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
if ((maxWait >= 0) && (now >= dieAfter)) {
|
if ((maxWait >= 0) && (now >= dieAfter)) {
|
||||||
long waitedExcess = now - dieAfter;
|
long waitedExcess = now - dieAfter;
|
||||||
throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess + "ms too long [" + maxWait + "ms])");
|
throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess
|
||||||
|
+ "ms too long [" + maxWait + "ms, remId " + remoteID
|
||||||
|
+ ", remId set " + (now-_remoteIdSetTime) + "ms ago])");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -202,8 +215,10 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
*/
|
*/
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
synchronized (flagLock) {
|
synchronized (flagLock) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Closing connection");
|
_log.debug("Closing connection");
|
||||||
closed = true;
|
closed = true;
|
||||||
|
_closedOn = I2PAppContext.getGlobalContext().clock().now();
|
||||||
}
|
}
|
||||||
out.close();
|
out.close();
|
||||||
in.notifyClosed();
|
in.notifyClosed();
|
||||||
@ -217,6 +232,7 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
closed = true;
|
closed = true;
|
||||||
closed2 = true;
|
closed2 = true;
|
||||||
sendClose = false;
|
sendClose = false;
|
||||||
|
_closedOn = I2PAppContext.getGlobalContext().clock().now();
|
||||||
}
|
}
|
||||||
out.close();
|
out.close();
|
||||||
in.notifyClosed();
|
in.notifyClosed();
|
||||||
@ -251,6 +267,12 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
_socketErrorListener.errorOccurred();
|
_socketErrorListener.errorOccurred();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getBytesSent() { return _bytesWritten; }
|
||||||
|
public long getBytesReceived() { return _bytesRead; }
|
||||||
|
public long getCreatedOn() { return _createdOn; }
|
||||||
|
public long getClosedOn() { return _closedOn; }
|
||||||
|
|
||||||
|
|
||||||
private String getPrefix() { return "[" + _socketId + "]: "; }
|
private String getPrefix() { return "[" + _socketId + "]: "; }
|
||||||
|
|
||||||
//--------------------------------------------------
|
//--------------------------------------------------
|
||||||
@ -276,12 +298,15 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
throw new RuntimeException("Incorrect read() result");
|
throw new RuntimeException("Incorrect read() result");
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(getPrefix() + "Read called: " + this.hashCode());
|
_log.debug(getPrefix() + "Read called for " + len + " bytes (avail=" + bc.getCurrentSize() + "): " + this.hashCode());
|
||||||
if (len == 0) return 0;
|
if (len == 0) return 0;
|
||||||
long dieAfter = System.currentTimeMillis() + readTimeout;
|
long dieAfter = System.currentTimeMillis() + readTimeout;
|
||||||
byte[] read = bc.startToByteArray(len);
|
byte[] read = null;
|
||||||
|
synchronized (bc) {
|
||||||
|
read = bc.startToByteArray(len);
|
||||||
|
}
|
||||||
boolean timedOut = false;
|
boolean timedOut = false;
|
||||||
|
|
||||||
while (read.length == 0) {
|
while (read.length == 0) {
|
||||||
@ -293,11 +318,13 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
synchronized (I2PSocketImpl.I2PInputStream.this) {
|
||||||
if (readTimeout >= 0) {
|
if (readTimeout >= 0) {
|
||||||
wait(readTimeout);
|
wait(readTimeout);
|
||||||
} else {
|
} else {
|
||||||
wait();
|
wait();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (InterruptedException ex) {}
|
} catch (InterruptedException ex) {}
|
||||||
|
|
||||||
if ((readTimeout >= 0)
|
if ((readTimeout >= 0)
|
||||||
@ -305,8 +332,10 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)");
|
throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized (bc) {
|
||||||
read = bc.startToByteArray(len);
|
read = bc.startToByteArray(len);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (read.length > len) throw new RuntimeException("BUG");
|
if (read.length > len) throw new RuntimeException("BUG");
|
||||||
System.arraycopy(read, 0, b, off, read.length);
|
System.arraycopy(read, 0, b, off, read.length);
|
||||||
|
|
||||||
@ -330,15 +359,21 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
queueData(data, 0, data.length);
|
queueData(data, 0, data.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void queueData(byte[] data, int off, int len) {
|
public void queueData(byte[] data, int off, int len) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
|
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
|
||||||
|
synchronized (bc) {
|
||||||
bc.append(data, off, len);
|
bc.append(data, off, len);
|
||||||
|
}
|
||||||
|
synchronized (I2PInputStream.this) {
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized void notifyClosed() {
|
public synchronized void notifyClosed() {
|
||||||
I2PInputStream.this.notifyAll();
|
synchronized (I2PInputStream.this) {
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
Reference in New Issue
Block a user