* added big fat start/stop lock into BOB

* added zap command to shut down BOB... now we need a way to start it
      after it stops. :-)
This commit is contained in:
sponge
2009-05-29 21:14:08 +00:00
parent fd598dea5b
commit 26c4f983d7
8 changed files with 294 additions and 150 deletions

View File

@ -27,14 +27,18 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.client.I2PClient;
import net.i2p.client.streaming.RetransmissionTimer;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleStore;
import net.i2p.util.SimpleTimer2;
/**
@ -114,10 +118,15 @@ public class BOB {
public final static String PROP_CONFIG_LOCATION = "BOB.config";
public final static String PROP_BOB_PORT = "BOB.port";
public final static String PROP_BOB_HOST = "BOB.host";
private static int maxConnections = 0;
private static NamedDB database;
private static Properties props = new Properties();
private static AtomicBoolean spin = new AtomicBoolean(true);
private static final String P_RUNNING = "RUNNING";
private static final String P_STARTING = "STARTING";
private static final String P_STOPPING = "STOPPING";
private static AtomicBoolean lock = new AtomicBoolean(false);
// no longer used.
// private static int maxConnections = 0;
/**
* Log a warning
@ -149,6 +158,12 @@ public class BOB {
_log.error(arg);
}
/**
* Stop BOB gracefully
*/
public static void stop() {
spin.set(false);
}
/**
* Listen for incoming connections and handle them
*
@ -156,6 +171,7 @@ public class BOB {
*/
public static void main(String[] args) {
database = new NamedDB();
ServerSocket listener = null;
int i = 0;
boolean save = false;
// Set up all defaults to be passed forward to other threads.
@ -168,77 +184,174 @@ public class BOB {
i = Y.hashCode();
i = Y1.hashCode();
i = Y2.hashCode();
{
try {
FileInputStream fi = new FileInputStream(configLocation);
props.load(fi);
fi.close();
} catch(FileNotFoundException fnfe) {
warn("Unable to load up the BOB config file " + configLocation + ", Using defaults.");
warn(fnfe.toString());
save = true;
} catch(IOException ioe) {
warn("IOException on BOB config file " + configLocation + ", using defaults.");
warn(ioe.toString());
}
}
// Global router and client API configurations that are missing are set to defaults here.
if(!props.containsKey(I2PClient.PROP_TCP_HOST)) {
props.setProperty(I2PClient.PROP_TCP_HOST, "localhost");
}
if(!props.containsKey(I2PClient.PROP_TCP_PORT)) {
props.setProperty(I2PClient.PROP_TCP_PORT, "7654");
}
if(!props.containsKey(I2PClient.PROP_RELIABILITY)) {
props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
}
if(!props.containsKey(PROP_BOB_PORT)) {
props.setProperty(PROP_BOB_PORT, "2827"); // 0xB0B
}
if(!props.containsKey("inbound.length")) {
props.setProperty("inbound.length", "1");
}
if(!props.containsKey("outbound.length")) {
props.setProperty("outbound.length", "1");
}
if(!props.containsKey("inbound.lengthVariance")) {
props.setProperty("inbound.lengthVariance", "0");
}
if(!props.containsKey("outbound.lengthVariance")) {
props.setProperty("outbound.lengthVariance", "0");
}
if(!props.containsKey(PROP_BOB_HOST)) {
props.setProperty(PROP_BOB_HOST, "localhost");
}
if(save) {
try {
warn("Writing new defaults file " + configLocation);
FileOutputStream fo = new FileOutputStream(configLocation);
props.store(fo, configLocation);
fo.close();
} catch(IOException ioe) {
error("IOException on BOB config file " + configLocation + ", " + ioe);
}
}
i = 0;
try {
info("BOB is now running.");
ServerSocket listener = new ServerSocket(Integer.parseInt(props.getProperty(PROP_BOB_PORT)), 10, InetAddress.getByName(props.getProperty(PROP_BOB_HOST)));
Socket server;
while((i++ < maxConnections) || (maxConnections == 0)) {
//DoCMDS connection;
server = listener.accept();
DoCMDS conn_c = new DoCMDS(server, props, database, _log);
Thread t = new Thread(conn_c);
t.start();
{
try {
FileInputStream fi = new FileInputStream(configLocation);
props.load(fi);
fi.close();
} catch (FileNotFoundException fnfe) {
warn("Unable to load up the BOB config file " + configLocation + ", Using defaults.");
warn(fnfe.toString());
save = true;
} catch (IOException ioe) {
warn("IOException on BOB config file " + configLocation + ", using defaults.");
warn(ioe.toString());
}
}
} catch(IOException ioe) {
error("IOException on socket listen: " + ioe);
ioe.printStackTrace();
// Global router and client API configurations that are missing are set to defaults here.
if (!props.containsKey(I2PClient.PROP_TCP_HOST)) {
props.setProperty(I2PClient.PROP_TCP_HOST, "localhost");
}
if (!props.containsKey(I2PClient.PROP_TCP_PORT)) {
props.setProperty(I2PClient.PROP_TCP_PORT, "7654");
}
if (!props.containsKey(I2PClient.PROP_RELIABILITY)) {
props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
}
if (!props.containsKey(PROP_BOB_PORT)) {
props.setProperty(PROP_BOB_PORT, "2827"); // 0xB0B
}
if (!props.containsKey("inbound.length")) {
props.setProperty("inbound.length", "1");
}
if (!props.containsKey("outbound.length")) {
props.setProperty("outbound.length", "1");
}
if (!props.containsKey("inbound.lengthVariance")) {
props.setProperty("inbound.lengthVariance", "0");
}
if (!props.containsKey("outbound.lengthVariance")) {
props.setProperty("outbound.lengthVariance", "0");
}
if (!props.containsKey(PROP_BOB_HOST)) {
props.setProperty(PROP_BOB_HOST, "localhost");
}
if (save) {
try {
warn("Writing new defaults file " + configLocation);
FileOutputStream fo = new FileOutputStream(configLocation);
props.store(fo, configLocation);
fo.close();
} catch (IOException ioe) {
error("IOException on BOB config file " + configLocation + ", " + ioe);
}
}
i = 0;
boolean g = false;
try {
info("BOB is now running.");
listener = new ServerSocket(Integer.parseInt(props.getProperty(PROP_BOB_PORT)), 10, InetAddress.getByName(props.getProperty(PROP_BOB_HOST)));
Socket server = null;
listener.setSoTimeout(500); // .5 sec
while (spin.get()) {
//DoCMDS connection;
try {
server = listener.accept();
g = true;
} catch (ConnectException ce) {
g = false;
} catch (SocketTimeoutException ste) {
g = false;
}
if (g) {
DoCMDS conn_c = new DoCMDS(spin, lock, server, props, database, _log);
Thread t = new Thread(conn_c);
t.setName("BOB.DoCMDS " + i);
t.start();
i++;
}
}
} catch (IOException ioe) {
error("IOException on socket listen: " + ioe);
ioe.printStackTrace();
}
} finally {
info("BOB is now shutting down...");
// Clean up everything.
try {
listener.close();
} catch (Exception ex) {
// nop
}
// Find all our "BOB.DoCMDS" threads, wait for them to be finished.
// We could order them to stop, but that could cause nasty issues in the locks.
visitAllThreads();
database.getReadLock();
int all = database.getcount();
database.releaseReadLock();
NamedDB nickinfo;
for (i = 0; i < all; i++) {
database.getReadLock();
nickinfo = (NamedDB) database.getnext(i);
nickinfo.getReadLock();
if (nickinfo.get(P_RUNNING).equals(Boolean.TRUE) && nickinfo.get(P_STOPPING).equals(Boolean.FALSE) && nickinfo.get(P_STARTING).equals(Boolean.FALSE)) {
nickinfo.releaseReadLock();
database.releaseReadLock();
database.getWriteLock();
nickinfo.getWriteLock();
nickinfo.add(P_STOPPING, new Boolean(true));
nickinfo.releaseWriteLock();
database.releaseWriteLock();
} else {
nickinfo.releaseReadLock();
database.releaseReadLock();
}
}
info("BOB is now stopped.");
}
}
/**
* Find the root thread group,
* then find all theads with certain names and wait for them all to be dead.
*
*/
private static void visitAllThreads() {
ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
while (root.getParent() != null) {
root = root.getParent();
}
// Visit each thread group
waitjoin(root, 0, root.getName());
}
private static void waitjoin(ThreadGroup group, int level, String tn) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
// Enumerate each thread in `group' and wait for it to stop if it is one of ours.
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
if (thread.getName().startsWith("BOB.DoCMDS ")) {
try {
if (thread.isAlive()) {
try {
thread.join();
} catch (InterruptedException ex) {
}
}
} catch (SecurityException se) {
//nop
}
}
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
waitjoin(groups[i], level + 1, groups[i].getName());
}
}
}

View File

@ -31,10 +31,12 @@ import java.io.PrintStream;
import java.net.Socket;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.I2PException;
import net.i2p.client.I2PClientFactory;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleStore;
/**
* Simplistic command parser for BOB
@ -57,6 +59,8 @@ public class DoCMDS implements Runnable {
private boolean dk, ns, ip, op;
private NamedDB nickinfo;
private Log _log;
private AtomicBoolean LIVE;
private AtomicBoolean lock;
/* database strings */
private static final String P_DEST = "DESTINATION";
private static final String P_INHOST = "INHOST";
@ -94,6 +98,7 @@ public class DoCMDS implements Runnable {
private static final String C_status = "status";
private static final String C_stop = "stop";
private static final String C_verify = "verify";
private static final String C_zap = "zap";
/* all the coomands available, plus description */
private static final String C_ALL[][] = {
@ -119,6 +124,7 @@ public class DoCMDS implements Runnable {
{C_status, C_status + " nickname * Display status of a nicknamed tunnel."},
{C_stop, C_stop + " * Stops the current nicknamed tunnel."},
{C_verify, C_verify + " BASE64_key * Verifies BASE64 destination."},
{C_zap, C_zap + " * Shuts down BOB."},
{"", "COMMANDS: " + // this is ugly, but...
C_help + " " +
C_clear + " " +
@ -141,19 +147,22 @@ public class DoCMDS implements Runnable {
C_start + " " +
C_status + " " +
C_stop + " " +
C_verify
C_verify + " " +
C_zap
},
{" ", " "} // end of list
};
/**
*
* @parm LIVE
* @param server
* @param props
* @param database
* @param _log
*/
DoCMDS(Socket server, Properties props, NamedDB database, Log _log) {
DoCMDS(AtomicBoolean LIVE, AtomicBoolean lock, Socket server, Properties props, NamedDB database, Log _log) {
this.lock = lock;
this.LIVE = LIVE;
this.server = server;
this.props = new Properties();
this.database = database;
@ -509,6 +518,11 @@ public class DoCMDS implements Runnable {
} else if (Command.equals(C_quit)) {
// End the command session
break quit;
} else if (Command.equals(C_zap)) {
// Kill BOB!! (let's hope this works!)
LIVE.set(false);
// End the command session
break quit;
} else if (Command.equals(C_newkeys)) {
if (ns) {
try {
@ -1260,7 +1274,10 @@ public class DoCMDS implements Runnable {
} else {
MUXlisten tunnel;
try {
tunnel = new MUXlisten(database, nickinfo, _log);
while(!lock.compareAndSet(false, true)) {
// wait
}
tunnel = new MUXlisten(lock, database, nickinfo, _log);
Thread t = new Thread(tunnel);
t.start();
// try {
@ -1270,8 +1287,10 @@ public class DoCMDS implements Runnable {
// }
out.println("OK tunnel starting");
} catch (I2PException e) {
lock.set(false);
out.println("ERROR starting tunnel: " + e);
} catch (IOException e) {
lock.set(false);
out.println("ERROR starting tunnel: " + e);
}
}

View File

@ -25,8 +25,6 @@ package net.i2p.BOB;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.I2PException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;

View File

@ -28,6 +28,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.I2PException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager;
@ -52,6 +53,7 @@ public class MUXlisten implements Runnable {
private int backlog = 50; // should this be more? less?
boolean go_out;
boolean come_in;
private AtomicBoolean lock;
/**
* Constructor Will fail if INPORT is occupied.
@ -62,9 +64,10 @@ public class MUXlisten implements Runnable {
* @throws net.i2p.I2PException
* @throws java.io.IOException
*/
MUXlisten(NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException {
MUXlisten(AtomicBoolean lock, NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException {
int port = 0;
InetAddress host = null;
this.lock = lock;
this.tg = null;
this.database = database;
this.info = info;
@ -151,7 +154,7 @@ public class MUXlisten implements Runnable {
return;
}
// socketManager.addDisconnectListener(new DisconnectListener());
lock.set(false);
quit:
{
try {
@ -216,7 +219,7 @@ public class MUXlisten implements Runnable {
break die;
}
}
/* cleared in the finally...
try {
wlock();
try {
@ -233,6 +236,7 @@ public class MUXlisten implements Runnable {
} catch (Exception e) {
break die;
}
*/
} // die
} catch (Exception e) {
@ -241,11 +245,11 @@ public class MUXlisten implements Runnable {
}
} // quit
} finally {
// Start cleanup. Allow threads above this one to catch the stop signal.
try {
Thread.sleep(250);
} catch (InterruptedException ex) {
// Start cleanup.
while (!lock.compareAndSet(false, true)) {
// wait
}
// zero out everything.
try {
wlock();
@ -261,7 +265,6 @@ public class MUXlisten implements Runnable {
} catch (Exception e) {
}
if (SS != null) {
try {
SS.close();
@ -279,7 +282,14 @@ public class MUXlisten implements Runnable {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
}
// Some grace time.
try {
Thread.sleep(250);
} catch (InterruptedException ex) {
}
lock.set(false); // Should we force waiting for all threads??
// Wait around till all threads are collected.
if (tg != null) {
String boner = tg.getName();
@ -362,39 +372,41 @@ public class MUXlisten implements Runnable {
}
}
/*
private static void nuke(ThreadGroup group, int level) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
// Enumerate each thread in `group' and stop it.
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
try {
if (thread.isAlive()) {
thread.stop();
}
} catch (SecurityException se) {
//nop
}
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
nuke(groups[i], level + 1);
}
try {
group.destroy();
} catch (IllegalThreadStateException IE) {
//nop
} catch (SecurityException se) {
//nop
}
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
// Enumerate each thread in `group' and stop it.
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
try {
if (thread.isAlive()) {
thread.stop();
}
} catch (SecurityException se) {
//nop
}
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
nuke(groups[i], level + 1);
}
try {
group.destroy();
} catch (IllegalThreadStateException IE) {
//nop
} catch (SecurityException se) {
//nop
}
}
*/
}

View File

@ -49,8 +49,8 @@ public class TCPio implements Runnable {
TCPio(InputStream Ain, OutputStream Aout /*, NamedDB info , NamedDB database */) {
this.Ain = Ain;
this.Aout = Aout;
// this.info = info;
// this.database = database;
// this.info = info;
// this.database = database;
}
/**
@ -86,30 +86,35 @@ public class TCPio implements Runnable {
byte a[] = new byte[1];
boolean spin = true;
try {
while(spin) {
b = Ain.read(a, 0, 1);
if(b > 0) {
Aout.write(a, 0, b);
} else if(b == 0) {
Thread.yield(); // this should act like a mini sleep.
if(Ain.available() == 0) {
try {
while (spin) {
b = Ain.read(a, 0, 1);
if (b > 0) {
Aout.write(a, 0, b);
} else if (b == 0) {
Thread.yield(); // this should act like a mini sleep.
if (Ain.available() == 0) {
Thread.sleep(10);
}
} else {
/* according to the specs:
*
* The total number of bytes read into the buffer,
* or -1 if there is no more data because the end of
* the stream has been reached.
*
*/
// System.out.println("TCPio: End Of Stream");
// Ain.close();
// Aout.close();
//return;
break;
}
} else {
/* according to the specs:
*
* The total number of bytes read into the buffer,
* or -1 if there is no more data because the end of
* the stream has been reached.
*
*/
// System.out.println("TCPio: End Of Stream");
Ain.close();
Aout.close();
return;
}
} catch (Exception e) {
}
} catch(Exception e) {
// System.out.println("TCPio: Leaving.");
} finally {
// Eject!!! Eject!!!
//System.out.println("TCPio: Caught an exception " + e);
try {
@ -122,6 +127,5 @@ public class TCPio implements Runnable {
}
return;
}
// System.out.println("TCPio: Leaving.");
}
}

View File

@ -29,8 +29,6 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
// import net.i2p.client.I2PSession;
// import net.i2p.client.I2PSessionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.util.Log;
@ -142,12 +140,7 @@ public class TCPlistener implements Runnable {
g = false;
}
}
listener.close();
} catch (IOException ioe) {
try {
listener.close();
} catch (IOException e) {
}
}
}
} finally {

View File

@ -1,3 +1,8 @@
2009-05-29 sponge
* added big fat start/stop lock into BOB
* added zap command to shut down BOB... now we need a way to start it
after it stops. :-)
2009-05-27 Mathiasdm
* Increase sendProcessingTime some more, add a property to configure.
Configure with 'router.defaultProcessingTimeThrottle'.

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 8;
public final static long BUILD = 9;
/** for example "-test" */
public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;