2009-04-10 sponge

* More BOB threadgroup fixes, plus debug dump when things go wrong.
    * Fixes to streaminglib, I2CP, which are related to the TG problem.
    * JavaDocs fixups.
This commit is contained in:
sponge
2009-04-10 23:12:41 +00:00
parent 2a2d3c0fb5
commit 384d655b1a
18 changed files with 348 additions and 188 deletions

View File

@ -748,14 +748,12 @@ public class DoCMDS implements Runnable {
nickinfo = (NamedDB) database.get(Arg); nickinfo = (NamedDB) database.get(Arg);
if (!tunnelactive(nickinfo)) { if (!tunnelactive(nickinfo)) {
nickinfo = null; nickinfo = null;
ns = ns = true;
true;
} }
} catch (Exception b) { } catch (Exception b) {
nickinfo = null; nickinfo = null;
ns = ns = true;
true;
} }
try { try {
@ -775,10 +773,10 @@ public class DoCMDS implements Runnable {
try { try {
database.add(Arg, nickinfo); database.add(Arg, nickinfo);
nickinfo.add(P_NICKNAME, Arg); nickinfo.add(P_NICKNAME, Arg);
nickinfo.add(P_STARTING, Boolean.FALSE); nickinfo.add(P_STARTING, new Boolean(false));
nickinfo.add(P_RUNNING, Boolean.FALSE); nickinfo.add(P_RUNNING, new Boolean(false));
nickinfo.add(P_STOPPING, Boolean.FALSE); nickinfo.add(P_STOPPING, new Boolean(false));
nickinfo.add(P_QUIET, Boolean.FALSE); nickinfo.add(P_QUIET, new Boolean(false));
nickinfo.add(P_INHOST, "localhost"); nickinfo.add(P_INHOST, "localhost");
nickinfo.add(P_OUTHOST, "localhost"); nickinfo.add(P_OUTHOST, "localhost");
Properties Q = new Properties(); Properties Q = new Properties();
@ -1265,13 +1263,17 @@ public class DoCMDS implements Runnable {
tunnel = new MUXlisten(database, nickinfo, _log); tunnel = new MUXlisten(database, nickinfo, _log);
Thread t = new Thread(tunnel); Thread t = new Thread(tunnel);
t.start(); t.start();
try {
Thread.sleep(1000 * 10); // Slow down the startup.
} catch(InterruptedException ie) {
// ignore it
}
out.println("OK tunnel starting"); out.println("OK tunnel starting");
} catch (I2PException e) { } catch (I2PException e) {
out.println("ERROR starting tunnel: " + e); out.println("ERROR starting tunnel: " + e);
} catch (IOException e) { } catch (IOException e) {
out.println("ERROR starting tunnel: " + e); out.println("ERROR starting tunnel: " + e);
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
break die; break die;
@ -1304,7 +1306,7 @@ public class DoCMDS implements Runnable {
break die; break die;
} }
nickinfo.add(P_STOPPING, Boolean.TRUE); nickinfo.add(P_STOPPING, new Boolean(true));
try { try {
wunlock(); wunlock();

View File

@ -26,8 +26,6 @@ package net.i2p.BOB;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManager;
@ -42,7 +40,7 @@ public class I2Plistener implements Runnable {
private NamedDB info, database; private NamedDB info, database;
private Log _log; private Log _log;
private int tgwatch; // private int tgwatch;
public I2PSocketManager socketManager; public I2PSocketManager socketManager;
public I2PServerSocket serverSocket; public I2PServerSocket serverSocket;
@ -60,7 +58,7 @@ public class I2Plistener implements Runnable {
this._log = _log; this._log = _log;
this.socketManager = S; this.socketManager = S;
serverSocket = SS; serverSocket = SS;
tgwatch = 1; // tgwatch = 1;
} }
private void rlock() throws Exception { private void rlock() throws Exception {
@ -84,18 +82,18 @@ public class I2Plistener implements Runnable {
die: { die: {
serverSocket.setSoTimeout(50); serverSocket.setSoTimeout(50);
try { // try {
if (info.exists("INPORT")) { // if (info.exists("INPORT")) {
tgwatch = 2; // tgwatch = 2;
} // }
} catch (Exception e) { // } catch (Exception e) {
try { // try {
runlock(); // runlock();
} catch (Exception e2) { // } catch (Exception e2) {
break die; // break die;
} // }
break die; // break die;
} // }
boolean spin = true; boolean spin = true;
while (spin) { while (spin) {
@ -137,29 +135,33 @@ die: {
} }
} }
// System.out.println("I2Plistener: Close"); // System.out.println("I2Plistener: Close");
try {
serverSocket.close();
} catch (I2PException e) { // Previous level does this cleanup now.
//
// try {
// serverSocket.close();
// } catch (I2PException e) {
// nop // nop
} //}
// need to kill off the socket manager too. // need to kill off the socket manager too.
I2PSession session = socketManager.getSession(); // I2PSession session = socketManager.getSession();
if (session != null) { // if (session != null) {
// System.out.println("I2Plistener: destroySession"); // System.out.println("I2Plistener: destroySession");
try { // try {
session.destroySession(); // session.destroySession();
} catch (I2PSessionException ex) { // } catch (I2PSessionException ex) {
// nop // nop
} // }
} //}
// System.out.println("I2Plistener: Waiting for children"); // System.out.println("I2Plistener: Waiting for children");
while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish // while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish
try { // try {
Thread.sleep(100); //sleep for 100 ms (One tenth second) // Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (Exception e) { // } catch (Exception e) {
// nop // nop
} // }
} //}
// System.out.println("I2Plistener: Done."); // System.out.println("I2Plistener: Done.");
} }

View File

@ -23,6 +23,7 @@
*/ */
package net.i2p.BOB; package net.i2p.BOB;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
@ -116,7 +117,22 @@ die: {
try { try {
Thread.sleep(10); //sleep for 10 ms Thread.sleep(10); //sleep for 10 ms
} catch(InterruptedException e) { } catch(InterruptedException e) {
// nop try {
in.close();
} catch(Exception ex) {
}
try {
out.close();
} catch(Exception ex) {
}
try {
Iin.close();
} catch(Exception ex) {
}
try {
Iout.close();
} catch(Exception ex) {
}
} }
} }
// System.out.println("I2PtoTCP: Going away..."); // System.out.println("I2PtoTCP: Going away...");

View File

@ -28,14 +28,13 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.util.Properties; import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.util.Log; import net.i2p.util.Log;
import org.tanukisoftware.wrapper.WrapperManager;
/** /**
* *
@ -59,8 +58,8 @@ public class MUXlisten implements Runnable {
/** /**
* Constructor Will fail if INPORT is occupied. * Constructor Will fail if INPORT is occupied.
* *
* @param info * @param info DB entry for this tunnel
* @param database * @param database master database of tunnels
* @param _log * @param _log
* @throws net.i2p.I2PException * @throws net.i2p.I2PException
* @throws java.io.IOException * @throws java.io.IOException
@ -134,11 +133,11 @@ public class MUXlisten implements Runnable {
*/ */
public void run() { public void run() {
I2PServerSocket SS = null; I2PServerSocket SS = null;
int ticks = 1200; // Allow 120 seconds, no more.
try { try {
wlock(); wlock();
try { try {
info.add("RUNNING", new Boolean(true)); info.add("RUNNING", new Boolean(true));
info.add("STARTING", new Boolean(false));
} catch (Exception e) { } catch (Exception e) {
wunlock(); wunlock();
return; return;
@ -177,12 +176,28 @@ die:
q.start(); q.start();
} }
try {
wlock();
try {
info.add("STARTING", new Boolean(false));
} catch (Exception e) {
wunlock();
break die;
}
} catch (Exception e) {
break die;
}
try {
wunlock();
} catch (Exception e) {
break die;
}
boolean spin = true; boolean spin = true;
while (spin) { while (spin) {
try { try {
Thread.sleep(200); //sleep for 200 ms (Two thenths second) Thread.sleep(1000); //sleep for 1 second
} catch (InterruptedException e) { } catch (InterruptedException e) {
// nop break die;
} }
try { try {
rlock(); rlock();
@ -220,22 +235,49 @@ die:
} }
} // die } // die
// try { if (SS != null) {
// Thread.sleep(500); //sleep for 500 ms (One half second) try {
// } catch (InterruptedException ex) { SS.close();
// // nop } catch (I2PException ex) {
// } //Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex);
// wait for child threads and thread groups to die }
}
if (this.come_in) {
try {
listener.close();
} catch (IOException e) {
}
}
I2PSession session = socketManager.getSession();
if (session != null) {
// System.out.println("I2Plistener: destroySession");
try {
session.destroySession();
} catch (I2PSessionException ex) {
// nop
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
// Wait for child threads and thread groups to die
// System.out.println("MUXlisten: waiting for children"); // System.out.println("MUXlisten: waiting for children");
if (tg.activeCount() + tg.activeGroupCount() != 0) { if (tg.activeCount() + tg.activeGroupCount() != 0) {
while (tg.activeCount() + tg.activeGroupCount() != 0) { while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
tg.interrupt(); // unwedge any blocking threads. tg.interrupt(); // unwedge any blocking threads.
ticks--;
try { try {
Thread.sleep(100); //sleep for 100 ms (One tenth second) Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
// NOP break quit;
} }
} }
if (tg.activeCount() + tg.activeGroupCount() != 0) {
break quit; // Uh-oh.
}
} }
tg.destroy(); tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it. // Zap reference to the ThreadGroup so the JVM can GC it.
@ -245,20 +287,32 @@ die:
break quit; break quit;
} }
} // quit } // quit
// This is here to catch when something fucks up REALLY bad.
if (tg != null) {
System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!");
System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p");
WrapperManager.requestThreadDump();
System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!");
System.out.println("BOB: MUXlisten: Please email the above dump to sponge@mail.i2p");
}
// This is here to catch when something fucks up REALLY bad. // This is here to catch when something fucks up REALLY bad.
if (tg != null) { if (tg != null) {
if (SS != null) {
try {
SS.close();
} catch (I2PException ex) {
//Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (this.come_in) {
try {
listener.close();
} catch (IOException e) {
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
ticks = 600; // 60 seconds
if (tg.activeCount() + tg.activeGroupCount() != 0) { if (tg.activeCount() + tg.activeGroupCount() != 0) {
tg.interrupt(); // unwedge any blocking threads. while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
while (tg.activeCount() + tg.activeGroupCount() != 0) { tg.interrupt(); // unwedge any blocking threads.
ticks--;
try { try {
Thread.sleep(100); //sleep for 100 ms (One tenth second) Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -266,30 +320,26 @@ die:
} }
} }
} }
tg.destroy(); if (tg.activeCount() + tg.activeGroupCount() == 0) {
// Zap reference to the ThreadGroup so the JVM can GC it. tg.destroy();
tg = null; // Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
} else {
System.out.println("BOB: MUXlisten: Can't kill threads. Please send the following dump to sponge@mail.i2p");
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN");
visit(tg, 0);
System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n");
}
} }
if (SS != null) { // This is here to catch when something fucks up REALLY bad.
try { // if (tg != null) {
SS.close(); // System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!");
} catch (I2PException ex) { // System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p");
//Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex); // WrapperManager.requestThreadDump();
} // System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!");
} // System.out.println("BOB: MUXlisten: Please email the above dump to sponge@mail.i2p");
// Lastly try to close things again. // }
if (this.come_in) {
try {
listener.close();
} catch (IOException e) {
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
// zero out everything. // zero out everything.
try { try {
wlock(); wlock();
@ -307,13 +357,49 @@ die:
} }
// private class DisconnectListener implements I2PSocketManager.DisconnectListener {
// // Debugging...
// public void sessionDisconnected() {
// close(); /**
// } * Find the root thread group and print them all.
// } *
// public void close() { */
// socketManager.destroySocketManager(); private void visitAllThreads() {
// } ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
while (root.getParent() != null) {
root = root.getParent();
}
// Visit each thread group
visit(root, 0);
}
/**
* Recursively visits all thread groups under `group' and dumps them.
* @param group ThreadGroup to visit
* @param level Current level
*/
private static void visit(ThreadGroup group, int level) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
String indent = "------------------------------------".substring(0, level) + "-> ";
// Enumerate each thread in `group' and print it.
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
System.out.println("BOB: MUXlisten: " + indent + thread.toString());
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
visit(groups[i], level + 1);
}
}
} }

View File

@ -23,6 +23,7 @@
*/ */
package net.i2p.BOB; package net.i2p.BOB;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -40,9 +41,8 @@ public class TCPio implements Runnable {
/** /**
* Constructor * Constructor
* *
* @param Ain * @param Ain InputStream
* @param Aout * @param Aout OutputStream
* @param info
* *
* param database * param database
*/ */
@ -99,11 +99,11 @@ public class TCPio implements Runnable {
} else if(b == 0) { } else if(b == 0) {
Thread.yield(); // this should act like a mini sleep. Thread.yield(); // this should act like a mini sleep.
if(Ain.available() == 0) { if(Ain.available() == 0) {
try { // try {
// Thread.yield(); // Thread.yield();
Thread.sleep(10); Thread.sleep(10);
} catch(InterruptedException ex) { // } catch(InterruptedException ex) {
} // }
} }
} else { } else {
/* according to the specs: /* according to the specs:
@ -114,13 +114,25 @@ public class TCPio implements Runnable {
* *
*/ */
// System.out.println("TCPio: End Of Stream"); // System.out.println("TCPio: End Of Stream");
Ain.close();
Aout.close();
return; return;
} }
} }
// System.out.println("TCPio: RUNNING = false"); // System.out.println("TCPio: RUNNING = false");
} catch(Exception e) { } catch(Exception e) {
// Eject!!! Eject!!! // Eject!!! Eject!!!
// System.out.println("TCPio: Caught an exception " + e); //System.out.println("TCPio: Caught an exception " + e);
try {
Ain.close();
} catch (IOException ex) {
// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex);
}
try {
Aout.close();
} catch (IOException ex) {
// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex);
}
return; return;
} }
// System.out.println("TCPio: Leaving."); // System.out.println("TCPio: Leaving.");

View File

@ -27,8 +27,8 @@ import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import net.i2p.client.I2PSession; // import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException; // import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -187,23 +187,25 @@ die: {
} }
} }
} }
// Previous level does this cleanup now.
//
// need to kill off the socket manager too. // need to kill off the socket manager too.
I2PSession session = socketManager.getSession(); // I2PSession session = socketManager.getSession();
if (session != null) { // if (session != null) {
try { // try {
session.destroySession(); // session.destroySession();
} catch (I2PSessionException ex) { // } catch (I2PSessionException ex) {
// nop // nop
} // }
} //}
//System.out.println("TCPlistener: Waiting for children"); //System.out.println("TCPlistener: Waiting for children");
while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish //while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish
try { // try {
Thread.sleep(100); //sleep for 100 ms (One tenth second) // Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (Exception e) { // } catch (Exception e) {
// nop // // nop
} // }
} //}
//System.out.println("TCPlistener: Done."); //System.out.println("TCPlistener: Done.");
} }
} }

View File

@ -114,11 +114,15 @@ public class TCPtoI2P implements Runnable {
*/ */
public void run() { public void run() {
String line, input; String line, input;
InputStream Iin = null;
OutputStream Iout = null;
InputStream in = null;
OutputStream out = null;
try { try {
InputStream in = sock.getInputStream(); in = sock.getInputStream();
OutputStream out = sock.getOutputStream(); out = sock.getOutputStream();
try { try {
line = lnRead(in); line = lnRead(in);
input = line.toLowerCase(); input = line.toLowerCase();
@ -136,8 +140,8 @@ public class TCPtoI2P implements Runnable {
I2P = socketManager.connect(dest); I2P = socketManager.connect(dest);
I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default
// make readers/writers // make readers/writers
InputStream Iin = I2P.getInputStream(); Iin = I2P.getInputStream();
OutputStream Iout = I2P.getOutputStream(); Iout = I2P.getOutputStream();
// setup to cross the streams // setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /*, info, database */); // I2P -> app TCPio conn_a = new TCPio(Iin, out /*, info, database */); // I2P -> app
@ -147,11 +151,11 @@ public class TCPtoI2P implements Runnable {
t.start(); t.start();
q.start(); q.start();
while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
try { // try {
Thread.sleep(10); //sleep for 10 ms Thread.sleep(10); //sleep for 10 ms
} catch(InterruptedException e) { // } catch(InterruptedException e) {
// nop // nop
} // }
} }
// System.out.println("TCPtoI2P: Going away..."); // System.out.println("TCPtoI2P: Going away...");
@ -171,6 +175,22 @@ public class TCPtoI2P implements Runnable {
} catch(Exception e) { } catch(Exception e) {
// bail on anything else // bail on anything else
} }
try {
in.close();
} catch(Exception e) {
}
try {
out.close();
} catch(Exception e) {
}
try {
Iin.close();
} catch(Exception e) {
}
try {
Iout.close();
} catch(Exception e) {
}
try { try {
// System.out.println("TCPtoI2P: Close I2P"); // System.out.println("TCPtoI2P: Close I2P");
I2P.close(); I2P.close();

View File

@ -100,7 +100,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
/** /**
* @param privKeyFile null to generate a transient key * @param pkf null to generate a transient key
* *
* @throws IllegalArgumentException if the I2CP configuration is b0rked so * @throws IllegalArgumentException if the I2CP configuration is b0rked so
* badly that we cant create a socketManager * badly that we cant create a socketManager

View File

@ -34,7 +34,29 @@ import net.i2p.util.EventDispatcher;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements Source, Sink { /**
* Base client class that sets up an I2P Datagram client destination.
* The UDP side is not implemented here, as there are at least
* two possibilities:
*
* 1) UDP side is a "server"
* Example: Streamr Consumer
* - Configure a destination host and port
* - External application sends no data
* - Extending class must have a constructor with host and port arguments
*
* 2) UDP side is a client/server
* Example: SOCKS UDP (DNS requests?)
* - configure an inbound port and a destination host and port
* - External application sends and receives data
* - Extending class must have a constructor with host and 2 port arguments
*
* So the implementing class must create a UDPSource and/or UDPSink,
* and must call setSink().
*
* @author zzz with portions from welterde's streamr
*/
public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements Source, Sink {
private static final Log _log = new Log(I2PTunnelUDPClientBase.class); private static final Log _log = new Log(I2PTunnelUDPClientBase.class);
protected I2PAppContext _context; protected I2PAppContext _context;
@ -69,33 +91,11 @@ public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements So
private Source _i2pSource; private Source _i2pSource;
private Sink _i2pSink; private Sink _i2pSink;
private Destination _otherDest; private Destination _otherDest;
/** /**
* Base client class that sets up an I2P Datagram client destination.
* The UDP side is not implemented here, as there are at least
* two possibilities:
*
* 1) UDP side is a "server"
* Example: Streamr Consumer
* - Configure a destination host and port
* - External application sends no data
* - Extending class must have a constructor with host and port arguments
*
* 2) UDP side is a client/server
* Example: SOCKS UDP (DNS requests?)
* - configure an inbound port and a destination host and port
* - External application sends and receives data
* - Extending class must have a constructor with host and 2 port arguments
*
* So the implementing class must create a UDPSource and/or UDPSink,
* and must call setSink().
*
* @throws IllegalArgumentException if the I2CP configuration is b0rked so * @throws IllegalArgumentException if the I2CP configuration is b0rked so
* badly that we cant create a socketManager * badly that we cant create a socketManager
*
* @author zzz with portions from welterde's streamr
*/ */
public I2PTunnelUDPClientBase(String destination, Logging l, EventDispatcher notifyThis, public I2PTunnelUDPClientBase(String destination, Logging l, EventDispatcher notifyThis,
I2PTunnel tunnel) throws IllegalArgumentException { I2PTunnel tunnel) throws IllegalArgumentException {
super("UDPServer", notifyThis, tunnel); super("UDPServer", notifyThis, tunnel);
_clientId = ++__clientId; _clientId = ++__clientId;

View File

@ -32,25 +32,6 @@ import net.i2p.util.EventDispatcher;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sink {
private final static Log _log = new Log(I2PTunnelUDPServerBase.class);
private Object lock = new Object();
protected Object slock = new Object();
private static volatile long __serverId = 0;
protected Logging l;
private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000;
/** default timeout to 3 minutes - override if desired */
protected long readTimeout = DEFAULT_READ_TIMEOUT;
private I2PSession _session;
private Source _i2pSource;
private Sink _i2pSink;
/** /**
* Base client class that sets up an I2P Datagram server destination. * Base client class that sets up an I2P Datagram server destination.
* The UDP side is not implemented here, as there are at least * The UDP side is not implemented here, as there are at least
@ -70,11 +51,34 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin
* *
* So the implementing class must create a UDPSource and/or UDPSink, * So the implementing class must create a UDPSource and/or UDPSink,
* and must call setSink(). * and must call setSink().
*
* @author zzz with portions from welterde's streamr
*/
public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sink {
private final static Log _log = new Log(I2PTunnelUDPServerBase.class);
private Object lock = new Object();
protected Object slock = new Object();
private static volatile long __serverId = 0;
protected Logging l;
private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000;
/** default timeout to 3 minutes - override if desired */
protected long readTimeout = DEFAULT_READ_TIMEOUT;
private I2PSession _session;
private Source _i2pSource;
private Sink _i2pSink;
/**
* *
* @throws IllegalArgumentException if the I2CP configuration is b0rked so * @throws IllegalArgumentException if the I2CP configuration is b0rked so
* badly that we cant create a socketManager * badly that we cant create a socketManager
* *
* @author zzz with portions from welterde's streamr
*/ */
public I2PTunnelUDPServerBase(boolean verify, File privkey, String privkeyname, Logging l, public I2PTunnelUDPServerBase(boolean verify, File privkey, String privkeyname, Logging l,

View File

@ -187,12 +187,12 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/"
+ _activeResends + "), waiting " + timeLeft); + _activeResends + "), waiting " + timeLeft);
try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) {} try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;}
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
+ "), waiting indefinitely"); + "), waiting indefinitely");
try { _outboundPackets.wait(250); } catch (InterruptedException ie) {} //10*1000 try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000
} }
} else { } else {
_context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start);
@ -810,7 +810,11 @@ public class Connection {
synchronized (_connectLock) { synchronized (_connectLock) {
_connectLock.wait(timeLeft); _connectLock.wait(timeLeft);
} }
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("waitForConnect(): InterruptedException");
_connectionError = "InterruptedException";
return;
}
} }
} }

View File

@ -126,7 +126,7 @@ class ConnectionHandler {
if (timeoutMs <= 0) { if (timeoutMs <= 0) {
try { try {
syn = _synQueue.take(); // waits forever syn = _synQueue.take(); // waits forever
} catch (InterruptedException ie) {} } catch (InterruptedException ie) { break;}
} else { } else {
long remaining = expiration - _context.clock().now(); long remaining = expiration - _context.clock().now();
// (dont think this applies anymore for LinkedBlockingQueue) // (dont think this applies anymore for LinkedBlockingQueue)
@ -138,7 +138,7 @@ class ConnectionHandler {
break; break;
try { try {
syn = _synQueue.poll(remaining, TimeUnit.MILLISECONDS); // waits the specified time max syn = _synQueue.poll(remaining, TimeUnit.MILLISECONDS); // waits the specified time max
} catch (InterruptedException ie) {} } catch (InterruptedException ie) { }
break; break;
} }
} }

View File

@ -213,7 +213,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
timeRemaining = 10*1000; timeRemaining = 10*1000;
wait(timeRemaining); wait(timeRemaining);
} }
} catch (InterruptedException ie) {} } catch (InterruptedException ie) { break; }
} }
if (!writeSuccessful()) if (!writeSuccessful())
releasePayload(); releasePayload();

View File

@ -146,8 +146,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
* I2PSession.PROTO_STREAMING * I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM * I2PSession.PROTO_DATAGRAM
* 255 disallowed * 255 disallowed
* @param fromport 1-65535 or 0 for unset * @param fromPort 1-65535 or 0 for unset
* @param toport 1-65535 or 0 for unset * @param toPort 1-65535 or 0 for unset
*/ */
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
SessionKey keyUsed, Set tagsSent, long expires, SessionKey keyUsed, Set tagsSent, long expires,

View File

@ -178,7 +178,11 @@ public class I2CPMessageReader {
// pause .5 secs when we're paused // pause .5 secs when we're paused
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException ie) { // nop } catch (InterruptedException ie) {
// we should break away here.
_log.warn("Breaking away stream", ie);
_listener.disconnected(I2CPMessageReader.this);
cancelRunner();
} }
} }
} }

View File

@ -16,12 +16,15 @@ import net.i2p.data.Hash;
* Base32 desthash.b32.i2p * Base32 desthash.b32.i2p
* example.i2p * example.i2p
* *
* @return null on failure
*
* @author zzz * @author zzz
*/ */
public class ConvertToHash { public class ConvertToHash {
/**
* Convert any kind of destination String to a hash
*
* @return null on failure
*/
public static Hash getHash(String peer) { public static Hash getHash(String peer) {
if (peer == null) if (peer == null)
return null; return null;

View File

@ -1,3 +1,8 @@
2009-04-10 sponge
* More BOB threadgroup fixes, plus debug dump when things go wrong.
* Fixes to streaminglib, I2CP, which are related to the TG problem.
* JavaDocs fixups.
2009-04-08 sponge 2009-04-08 sponge
* More hopeful fixups to the infamous orpahned tunnel problem. *Sigh* * More hopeful fixups to the infamous orpahned tunnel problem. *Sigh*

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 14; public final static long BUILD = 15;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);