merge of '9666f5fdfc24a7fc2ca3a99a95ea5dfef5583b1b'

and 'e76b1962963aa7cadb74aacc32f90adf31db3761'
This commit is contained in:
mkvore-commit
2009-04-08 06:41:53 +00:00
45 changed files with 2875 additions and 231 deletions

View File

@ -30,6 +30,37 @@ public interface I2PServerSocket {
*/
public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/**
* accept(timeout) waits timeout ms for a socket connecting. If a socket is
* not available during the timeout, return null. accept(0) behaves like accept()
*
* @param timeout in ms
*
* @return a connected I2PSocket, or null
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if thread is interrupted while waiting
*/
public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException;
/**
* Wait until there is a socket waiting for acception or the timeout is
* reached.
*
* @param timeoutMs timeout in ms. If ms is 0, wait forever.
*
* @return true if a socket is available, false if not
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if the thread is interrupted before
* completion
*/
public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException;
/**
* Set Sock Option accept timeout
* @param x timeout in ms

View File

@ -1,6 +1,7 @@
package net.i2p.client.streaming;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -20,7 +21,7 @@ class I2PServerSocketImpl implements I2PServerSocket {
private final static Log _log = new Log(I2PServerSocketImpl.class);
private I2PSocketManager mgr;
/** list of sockets waiting for the client to accept them */
private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
private List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4));
/** have we been closed */
private volatile boolean closing = false;
@ -50,6 +51,82 @@ class I2PServerSocketImpl implements I2PServerSocket {
}
/**
* Waits until there is a socket waiting for acception or the timeout is
* reached.
*
* @param timeoutMs timeout in ms. A negative value waits forever.
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if thread is interrupted while waiting
*/
public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException {
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitIncoming() called, pending: " + pendingSockets.size());
boolean isTimed = (timeoutMs>0);
if (isTimed) {
Clock clock = I2PAppContext.getGlobalContext().clock();
long now = clock.now();
long end = now + timeoutMs;
while (pendingSockets.size() <= 0 && now<end) {
if (closing) throw new ConnectException("I2PServerSocket closed");
synchronized(socketAddedLock) {
socketAddedLock.wait(end - now);
}
now = clock.now();
}
} else {
while (pendingSockets.size() <= 0) {
if (closing) throw new ConnectException("I2PServerSocket closed");
try {
synchronized(socketAddedLock) {
socketAddedLock.wait();
}
} catch (InterruptedException ie) {}
}
}
}
/**
* accept(timeout) waits timeout ms for a socket connecting. If a socket is
* not available during the timeout, return null. accept(0) behaves like accept()
*
* @param timeout in ms
*
* @return a connected I2PSocket, or null
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if thread is interrupted while waiting
*/
public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException {
I2PSocket ret = null;
if (timeout<=0) {
ret = accept();
} else {
long now = I2PAppContext.getGlobalContext().clock().now();
long expiration = timeout + now ;
synchronized (pendingSockets) {
while (pendingSockets.size() == 0 && expiration>now) {
pendingSockets.wait(expiration-now);
now = I2PAppContext.getGlobalContext().clock().now();
}
ret = (I2PSocket)pendingSockets.remove(0);
}
if (ret != null) {
synchronized (socketAcceptedLock) {
socketAcceptedLock.notifyAll();
}
}
}
return ret;
}
/**
* Waits for the next socket connecting. If a remote user tried to make a
* connection and the local application wasn't .accept()ing new connections,
* they should get refused (if .accept() doesnt occur in some small period -
@ -68,24 +145,12 @@ class I2PServerSocketImpl implements I2PServerSocket {
I2PSocket ret = null;
while ( (ret == null) && (!closing) ){
while (pendingSockets.size() <= 0) {
if (closing) throw new ConnectException("I2PServerSocket closed");
try {
synchronized(socketAddedLock) {
socketAddedLock.wait();
}
} catch (InterruptedException ie) {}
}
synchronized (pendingSockets) {
if (pendingSockets.size() > 0) {
ret = (I2PSocket)pendingSockets.remove(0);
}
}
if (ret != null) {
synchronized (socketAcceptedLock) {
socketAcceptedLock.notifyAll();
}
}
try {
this.waitIncoming(0);
ret = accept(1);
} catch (InterruptedException e) {
throw new I2PException("Thread interrupted") ;
}
}
if (_log.shouldLog(Log.DEBUG))

View File

@ -350,7 +350,6 @@ class I2PSocketImpl implements I2PSocket {
read = bc.startToByteArray(len);
bc.notifyAll();
}
boolean timedOut = false;
while ( (read.length == 0) && (!inStreamClosed) ) {
synchronized (flagLock) {

View File

@ -13,7 +13,6 @@ import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
/**

View File

@ -43,12 +43,12 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
private I2PSession _session;
private I2PServerSocketImpl _serverSocket = null;
private Object lock = new Object(); // for locking socket lists
private HashMap _outSockets;
private HashMap _inSockets;
private HashMap<String,I2PSocket> _outSockets;
private HashMap<String,I2PSocket> _inSockets;
private I2PSocketOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
private List _listeners;
private List<DisconnectListener> _listeners;
private static int __managerId = 0;
public static final short ACK = 0x51;
@ -76,10 +76,10 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
_name = name;
_context = context;
_log = _context.logManager().getLog(I2PSocketManager.class);
_inSockets = new HashMap(16);
_outSockets = new HashMap(16);
_inSockets = new HashMap<String,I2PSocket>(16);
_outSockets = new HashMap<String,I2PSocket>(16);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_listeners = new ArrayList(1);
_listeners = new ArrayList<DisconnectListener>(1);
setSession(session);
setDefaultOptions(buildOptions(opts));
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -113,9 +113,9 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
public void disconnected(I2PSession session) {
_log.info(getName() + ": Disconnected from the session");
destroySocketManager();
List listeners = null;
List<DisconnectListener> listeners = null;
synchronized (_listeners) {
listeners = new ArrayList(_listeners);
listeners = new ArrayList<DisconnectListener>(_listeners);
_listeners.clear();
}
for (int i = 0; i < listeners.size(); i++) {
@ -130,7 +130,6 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
public void messageAvailable(I2PSession session, int msgId, long size) {
try {
I2PSocketImpl s;
byte msg[] = session.receiveMessage(msgId);
if (msg.length == 1 && msg[0] == -1) {
_log.debug(getName() + ": Ping received");
@ -660,7 +659,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
*
*/
public Set listSockets() {
Set sockets = new HashSet(8);
Set<I2PSocket> sockets = new HashSet<I2PSocket>(8);
synchronized (lock) {
sockets.addAll(_inSockets.values());
sockets.addAll(_outSockets.values());

View File

@ -28,6 +28,10 @@ public class TestSwarm {
private String _conOptions; // unused? used elsewhere?
private boolean _dead; // unused? used elsewhere?
public void antiCompilationWarnings() {
_log.debug(""+_conOptions+_dead);
}
public static void main(String args[]) {
if (args.length < 1) {
System.err.println("Usage: TestSwarm myDestFile [peerDestFile ]*");
@ -131,6 +135,14 @@ public class TestSwarm {
_context.statManager().createRateStat("swarm." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
}
public void antiCompilationWarnings() {
_log.debug(""+this._lastReceived+this._lastReceivedOn+this._started);
}
public void antiCompilationWarnings(long x, long y) {
if (false)
_log.debug(""+x+y);
}
public Flooder(I2PSocket socket) {
_socket = socket;
_remoteDestination = socket.getPeerDestination();
@ -154,6 +166,8 @@ public class TestSwarm {
_context.random().nextBytes(data);
long value = 0;
long lastSend = _context.clock().now();
this.antiCompilationWarnings(value, lastSend);
if (_socket == null) {
try {
_socket = _manager.connect(_remoteDestination);

View File

@ -0,0 +1,15 @@
# test example
#in a first terminal, launch :
./samIn.py inTest
#in a second terminal, launch :
./samForward.py 25000 forward
#in a third terminal, launch :
l=0
while [ $l -lt 1000 ]
do
l=$((l+1))
./samOut.py forward this is message n. $l
done

View File

@ -0,0 +1,35 @@
#!/usr/bin/python
import socket
import sys
# create a forward style SAM datagram session
# that forwards messages on specified port (default port : 25000)
# creates a standard datagram server that listens on this port forever
# usage : ./samForward.py [port [SAM session name]]
if len(sys.argv)>=2 :
port = eval(sys.argv[1])
else :
port = 25000
if len(sys.argv)==3 :
name = sys.argv[2]
else :
name = "datagramSamForward"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=DATAGRAM PORT="+str(port)+" ID="+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW outbound.nickname="+name+" inbound.nickname="+name+" outbound.length=0\n")
sys.stdout.write(sess.recv(10000))
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", port))
print "waiting on port:", port
while 1:
data, addr = s.recvfrom(40000)
print data, " received from ", addr, "length=", len(data)

View File

@ -0,0 +1,29 @@
#!/usr/bin/python
# create a SAM datagram session that writes incoming messages on its master session stream
# and a listen forever
# usage : ./samIn.py [session name]
import socket
import sys
if len(sys.argv)==2 :
name = sys.argv[1]
else :
name = "datagramSamIn"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=DATAGRAM ID="+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n")
sys.stdout.write(sess.recv(1000))
while 1 :
chunk = sess.recv(10000)
sys.stdout.write(chunk+'\n')
if not chunk : break
print

View File

@ -0,0 +1,31 @@
#!/usr/bin/python
# sends a message to datagram destinations opened by samForward.py and samIn.py, using specified sending session name
# at least samForward.py should be running for results to be seen
# usage : ./samOut.py [ sendingSessionName [ message ... ] ]
# sendingSessionName : default = datagramSamForward
# message : default = "this is nice message"
import socket
import sys
import time
if len(sys.argv)>=2 :
name = sys.argv[1]
else :
name = "datagramSamForward"
if len(sys.argv)>2 :
message = ''.join([s+' ' for s in sys.argv[2:]]).strip()
else :
message = "This is a nice message"
# client.py
port = 7655
host = "localhost"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", 0))
s.sendto("3.0 "+name+" tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA\n"+message, (host, port))
s.sendto("3.0 "+name+" EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAA\n"+message, (host, port))

View File

@ -0,0 +1,15 @@
# test example
#in a first terminal, launch :
./samIn.py inTest
#in a second terminal, launch :
./samForward.py 25000 forward
#in a third terminal, launch :
l=0
while [ $l -lt 1000 ]
do
l=$((l+1))
./samOut.py forward this is message n. $l
done

View File

@ -0,0 +1,36 @@
#!/usr/bin/python
import socket
import sys
# create a forward style SAM raw datagram session
# that forwards messages on specified port (default port : 25000)
# creates a standard datagram server that listens on this port forever
# usage : ./samForward.py [port [SAM session name]]
if len(sys.argv)>=2 :
port = eval(sys.argv[1])
else :
port = 25000
if len(sys.argv)==3 :
name = sys.argv[2]
else :
name = "rawSamForward"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=RAW PORT="+str(port)+" ID="+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n")
sys.stdout.write(sess.recv(10000))
# listening server
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", port))
print "waiting on port:", port
while 1:
data, addr = s.recvfrom(40000)
print data, " received from ", addr, "length=", len(data)

View File

@ -0,0 +1,31 @@
#!/usr/bin/python
# create a SAM datagram session that writes incoming messages on its master session stream
# and a listen forever
# usage : ./samIn.py [session name]
import socket
import sys
if len(sys.argv)==2 :
name = sys.argv[1]
else :
name = "rawSamIn"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=RAW ID="+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n")
sys.stdout.write(sess.recv(1000))
# listen incoming messages
while 1 :
chunk = sess.recv(10000)
sys.stdout.write(chunk+'\n')
if not chunk : break
print

View File

@ -0,0 +1,31 @@
#!/usr/bin/python
# sends a message to datagram destinations opened by samForward.py and samIn.py, using specified sending session name
# at least samForward.py should be running for results to be seen
# usage : ./samOut.py [ sendingSessionName [ message ... ] ]
# sendingSessionName : default = datagramSamForward
# message : default = "this is nice message"
import socket
import sys
import time
if len(sys.argv)>=2 :
name = sys.argv[1]
else :
name = "rawSamForward"
if len(sys.argv)>2 :
message = ''.join([s+' ' for s in sys.argv[2:]]).strip()
else :
message = "This is a nice message"
# client.py
port = 7655
host = "localhost"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", 0))
s.sendto("3.0 "+name+" tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA\n"+message, (host, port))
s.sendto("3.0 "+name+" EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAA\n"+message, (host, port))

View File

@ -0,0 +1,24 @@
# test example
#in a first terminal, launch :
./samIn.py inTest
#in a second terminal, launch :
./samOut.py
#and again
./samOut.py
##########
# test example n°2
#in a first terminal, launch :
./samForward.py inTest
#in a second terminal, launch :
./server.py
#in a third terminal, launch :
./samOut.py

View File

@ -0,0 +1,64 @@
#!/usr/bin/python
import socket
import sys
# create a master SAM stream session that opens a destination in I2P world
# then open another session that tells SAM to forward incoming connections
# to the specified address
#
# usage :
# ./samForward.py [ silent [ port [ sessionName [ host ] ] ] ]
#
# silent : should the first line of incoming socket contain the peer destination (true or false)
# port : port to which connections are forwarded (default : 25000)
# sessionName : session id (default : "forward")
# host : host to which connections are forwarded (default : this host)
if len(sys.argv)>=2 :
silent = " SILENT="+sys.argv[1]
else : silent = " SILENT=false"
if len(sys.argv)>=3 :
port = " PORT="+sys.argv[2]
else : port = " PORT=25000"
if len(sys.argv)>=4 :
name = " ID="+sys.argv[3]
else : name = " ID=forward"
if len(sys.argv)>=5 :
host = " HOST="+sys.argv[4]
else : host = ""
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=STREAM"+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n")
sys.stdout.write(sess.recv(1000))
sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1",7656));
sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sock.recv(1000))
sock.send("STREAM FORWARD" + name + host + port + silent + "\n")
sys.stdout.write(sock.recv(1000))
l=0
while 1 :
chunk = sock.recv(100)
sys.stdout.write(chunk)
if not chunk : break
print "Forward socket closed"
l=0
while 1 :
chunk = sess.recv(100)
sys.stdout.write(chunk)
if not chunk : break

View File

@ -0,0 +1,91 @@
#!/usr/bin/python
# create an stream session
# then an "accept" stream connected to this session
# then another "accept" stream from the same session
# then listen from the first stream and then listen from the second
# usage : ./samIn.py [ silent [ name ] ]
# name : the session id ( defaults to InTest )
# silent : true or false : tells wether we want to receive the incoming stream destination
# as first line
import socket
import sys
import time
if len(sys.argv)>=2 :
silent = " SILENT="+sys.argv[1]
else : silent = " SILENT=false"
if len(sys.argv)>=3 :
name = sys.argv[2]
else : name = "inTest"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=STREAM ID="+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n")
sys.stdout.write(sess.recv(1000))
def accept() :
sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1",7656));
sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sock.recv(1000))
sock.send("STREAM ACCEPT ID=" + name + silent+"\n")
print "STREAM ACCEPT ID="+name+silent
if (silent==" SILENT=false") :
sys.stdout.write( sock.recv(100) )
return sock
def echo( sock, lines ) :
l = 0
while lines==-1 or l<lines :
chunk = sock.recv(1000)
if lines!=-1 : l = l + 1
if not chunk : break
print chunk
sock.send(chunk)
print
sock1 = accept()
time.sleep(1)
sock2 = accept()
print "Second listening session"
try :
echo(sock2, -1)
except :
print sock2
if silent == " SILENT=false" :
sys.stdout.write(sock1.recv(1000))
else :
# we know sock1 is accepted if it receives a byte
sock1.recv(1)
sock3 = accept()
print "First listening session"
echo(sock1, 2)
sock1.close()
print "Third listening session"
echo(sock3, -1)

View File

@ -0,0 +1,52 @@
#!/usr/bin/python
# open a I2P stream destination
# then open another stream that connects to the destination created by samForward.py or samIn.py
# then send bytes through the stream
# usage :
# ./samOut.py [ silent [ sessionName ] ]
#
# silent : should the first incoming after the connection request contain the connection status message (true or false)
# sessionName : session id (default : "forward")
import socket
import sys
import time
if len(sys.argv)>=2 :
silent = " SILENT="+sys.argv[1]
else : silent = " SILENT=false"
if len(sys.argv)>=3 :
name = " ID="+sys.argv[2]
else : name = " ID=testOutStream"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=STREAM"+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n")
sys.stdout.write(sess.recv(1000))
sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1",7656));
sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sock.recv(1000))
sock.send("STREAM CONNECT"+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA"+silent+"\n")
# wait for acknowledgement before sending data, if we asked for it
if (silent==" SILENT=false") :
sys.stdout.write(sock.recv(1000))
for i in range(1,11) :
sock.send(str(i)+'\n')
buf=sock.recv(1000)
sys.stdout.write(str(i)+' '+buf)
if not buf : break
print

View File

@ -0,0 +1,51 @@
#!/usr/bin/python
# open a I2P stream destination
# then open another stream that connects to the destination created by samForward.py or samIn.py
# then send bytes through the stream
# usage :
# ./samOut.py [ silent [ sessionName ] ]
#
# silent : should the first incoming after the connection request contain the connection status message (true or false)
# sessionName : session id (default : "forward")
import socket
import sys
import time
if len(sys.argv)>=2 :
silent = " SILENT="+sys.argv[1]
else : silent = " SILENT=false"
if len(sys.argv)>=3 :
name = " ID="+sys.argv[2]
else : name = " ID=testOutStream"
sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sess.connect(("127.0.0.1",7656));
sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sess.recv(1000))
sess.send("SESSION CREATE STYLE=STREAM"+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n")
sys.stdout.write(sess.recv(1000))
sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1",7656));
sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n")
sys.stdout.write(sock.recv(1000))
sock.send("STREAM CONNECT"+name+" DESTINATION=http://amiga.i2p"+silent+"\n")
# wait for acknowledgement before sending data, if we asked for it
if (silent==" SILENT=false") :
sys.stdout.write(sock.recv(1000))
while (1) :
buf=sock.recv(1000)
sys.stdout.write(buf)
if not buf : break
print

View File

@ -0,0 +1,41 @@
#!/usr/bin/python
# echo server
# accepts a socket on specified port, writes on stdout and send back incoming data
import socket
import sys
if len(sys.argv)>=2 :
port = eval(sys.argv[1])
else : port = 25000
#create an INET, STREAMing socket
serversocket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
#bind the socket to a public host,
# and a well-known port
serversocket.bind(("0.0.0.0", port))
#become a server socket
serversocket.listen(1)
#accept connections from outside
(clientsocket, address) = serversocket.accept()
#now do something with the clientsocket
#in this case, we'll pretend this is a threaded server
i = 0
while 1 :
chunk = clientsocket.recv(1024)
i = i + 1
sys.stdout.write(str(i)+' '+chunk)
if not chunk: break
clientsocket.send(str(i)+' '+chunk)
clientsocket.close()
print

View File

@ -0,0 +1,477 @@
----------------------------------------------------------------------
Simple Anonymous Messaging (SAM version 3.0)
----------------------------------------------------------------------
Client application talks to SAM bridge, which deals with
all of the I2P functionality (using the ministreaming
lib for virtual streams, or I2CP directly for async messages).
All client<-->SAM bridge communication is unencrypted and
unauthenticated. Access to the SAM
bridge should be protected through firewalls or other means
(perhaps the bridge may have ACLs on what IPs it accepts
connections from).
All of these SAM messages are sent on a single line in plain ASCII,
terminated by the newline character (\n). The formatting shown
below is merely for readability, and while the first two words in
each message must stay in their specific order, the ordering of
the key=value pairs can change (e.g. "ONE TWO A=B C=D" or
"ONE TWO C=D A=B" are both perfectly valid constructions). In
addition, the protocol is case-sensitive.
In the following, message examples are preceded by "-> " for
messages sent by the client to the SAM bridge, and by "<- " for
messages sent by the SAM bridge to the client.
I2P communications can take three distinct forms:
* Virtual streams
* Repliable datagrams (messages with a FROM field)
* Anonymous datagrams (raw anonymous messages)
I2P communications are supported by I2P sessions, and each I2P
session is bound to an address (called destination). An I2P session
is associated with one of the three types above, and cannot carry
communications of another type.
----------------------------------------------------------------------
SAM connection handshake
----------------------------------------------------------------------
No SAM communication can occur until after the client and bridge have
agreed on a protocol version, which is done by the client sending
a HELLO and the bridge sending a HELLO REPLY:
-> HELLO VERSION MIN=$min MAX=$max
and
<- HELLO REPLY RESULT=OK VERSION=3.0
*** In order to force protocol version 3.0, the values of $min and $max
*** must be "3.0".
If the SAM bridge cannot find a suitable version, it replies with :
<- HELLO REPLY RESULT=NOVERSION
If some error occurred, such as a bad request format, it replies with :
<- HELLO REPLY RESULT=I2P_ERROR MESSAGE={$message}
----------------------------------------------------------------------
SAM sessions
----------------------------------------------------------------------
A SAM session is created by a client opening a socket to the SAM
bridge, operating a handshake, and sending a SESSION CREATE message,
and the session terminates when the socket is disconnected.
Each registered I2P Destination is uniquely associated with a session ID
(or nickname).
Each session is uniquely associated with :
* the socket from which the client creates the session
* its ID (or nickname)
The session creation message can only use one of these forms (messages
received through other forms are answered with an error message) :
-> SESSION CREATE
STYLE={STREAM,DATAGRAM,RAW}
ID={$nickname}
DESTINATION={$private_destination_key,TRANSIENT}
[option=value]*
DESTINATION specifies what destination should be used for
sending and receiving messages/streams. It has to be a suitable
private base64 destination key. If the destination is
specified as TRANSIENT, the SAM bridge creates a new destination.
{$nickname} is the choice of the client. No whitespace is allowed.
Additional options given are passed to the I2P session
configuration if not interpreted by the SAM bridge (e.g.
outbound.length=0). These options are documented below.
The SAM bridge itself should already be configured with what router
it should communicate over I2P through (though if need be there may
be a way to provide an override, e.g. i2cp.tcp.host=localhost and
i2cp.tcp.port=7654).
After receiving the session create message, the SAM bridge will reply
with a session status message, as follows:
If the creation was successful :
<- SESSION STATUS RESULT=OK DESTINATION={$private_destination_key}
If the nickname is already associated with a session :
<- SESSION STATUS RESULT=DUPLICATED_ID
If the destination is already in use :
<- SESSION STATUS RESULT=DUPLICATED_DEST
If the destination is not a valid private destination key :
<- SESSION STATUS RESULT=INVALID_KEY
If some other error has occurred :
<- SESSION STATUS RESULT=I2P_ERROR MESSAGE={$message}
If it's not OK, the MESSAGE should contain human-readable information
as to why the session could not be created.
SAM sessions live and die with the socket they are associated with.
When the socket is closed, the session dies, and all communications
using the session die at the same time. And the other way round, when
the session dies for any reason, the SAM bridge closes the socket.
----------------------------------------------------------------------
SAM virtual streams
----------------------------------------------------------------------
Virtual streams are guaranteed to be sent reliably and in order, with
failure and success notification as soon as it is available.
Streams are bidirectional communication sockets between two I2P
destinations, but their opening has to be requested by one of them.
Hereafter, CONNECT commands are used by the SAM client for such a
request. FORWARD / ACCEPT commands are used by the SAM client when
it wants to listen to requests coming from other I2P destinations.
-----------------------------
SAM virtual streams : CONNECT
-----------------------------
A client asks for a connection by :
* opening a new socket with the SAM bridge
* passing the same HELLO handshake as above
* sending the connection command :
-> STREAM CONNECT
ID={$nickname}
DESTINATION=$peer_public_base64_key
[SILENCE={true,false}]
This establishes a new virtual connection from the local session
whose ID is {$nickname} to the specified peer.
If SILENCE=true is passed, the SAM bridge won't issue any other message
on the socket : if the connection fails, the socket will be closed.
If the connection succeeds, all remaining data passing through the
current socket is forwarded from and to the connected I2P destination
peer.
If SILENCE=false, which is the default value, the SAM bridge sends a
last message to its client before forwarding or shutting down the
socket :
<- STREAM STATUS
RESULT=$result
[MESSAGE=...]
The RESULT value may be one of:
OK
CANT_REACH_PEER
I2P_ERROR
INVALID_KEY
INVALID_ID
TIMEOUT
If the RESULT is OK, all remaining data passing through the
current socket is forwarded from and to the connected I2P destination
peer. If the connection was not possible (timeout, etc),
RESULT will contain the appropriate error value (accompanied by an
optional human-readable MESSAGE), and the SAM bridge closes the
socket.
----------------------------
SAM virtual streams : ACCEPT
----------------------------
A client waits for an incoming connection request by :
* opening a new socket with the SAM bridge
* passing the same HELLO handshake as above
* sending the accept command :
-> STREAM ACCEPT
ID={$nickname}
[SILENCE={true,false}]
This makes the session ${nickname} listen for one incoming
connection request from the I2P network.
The SAM bridge answers with :
<- STREAM STATUS
RESULT=$result
[MESSAGE=...]
The RESULT value may be one of:
OK
I2P_ERROR
INVALID_ID
If the result is not OK, the socket is closed immediately by the SAM
bridge. If the result is OK, the SAM bridge starts waiting for an
incoming connection request from another I2P peer. When a request
arrives, the SAM bridge accepts it and :
* If SILENCE=true was passed, the SAM bridge won't issue any other message
on the client socket : all remaining data passing through the
current socket is forwarded from and to the connected I2P destination
peer.
* If SILENCE=false was passed, which is the default value, the SAM bridge
sends the client a ASCII line containing the base64 public destination key
of the requesting peer. After this '\n' terminated line, all remaining data
passing through the current socket is forwarded from and to the connected
I2P destination peer, until one of the peer closes the socket.
-----------------------------
SAM virtual streams : FORWARD
-----------------------------
A client waits for an incoming connection request by :
* opening a new socket with the SAM bridge
* passing the same HELLO handshake as above
* sending the forward command :
-> STREAM FORWARD
ID={$nickname}
PORT={$port}
[HOST={$host}]
[SILENCE={true,false}]
This makes the session ${nickname} listen forever for incoming
connection requests from the I2P network.
The SAM bridge answers with :
<- STREAM STATUS
RESULT=$result
[MESSAGE=...]
The RESULT value may be one of:
OK
I2P_ERROR
INVALID_ID
The socket is closed immediately after the message by the SAM
bridge. If the result is OK, the SAM bridge starts waiting for
incoming connection requests from other I2P peers.
* {$host} is the hostname or IP address of the socket server to which
SAM will forward connection requests. If not given, SAM takes the IP
of the socket that issued the forward command.
* {$port} is the port number of the socket server to which SAM will
forward connection requests. Is is mandatory.
When a connexion request arrives from I2P, the SAM bridge requests a
socket connexion from {$host}:{$port}. If it is accepted after no more
than 3 seconds, SAM will accept the connexion from I2P, and then :
* If SILENCE=true was passed, all data passing through the obtained
current socket is forwarded from and to the connected I2P destination
peer.
* If SILENCE=false was passed, which is the default value, the SAM bridge
sends on the obtained socket an ASCII line containing the base64 public
destination key of the requesting peer. After this '\n' terminated line,
all remaining data passing through the socket is forwarded from and to
the connected I2P destination peer, until one of the sides closes the
socket.
----------------------------------------------------------------------
SAM repliable datagrams : sending a datagram
----------------------------------------------------------------------
While I2P doesn't inherently contain a FROM address, for ease of use
an additional layer is provided as repliable datagrams - unordered
and unreliable messages of up to 31KB in size that include a FROM
address (leaving up to 1KB for header material). This FROM address
is authenticated internally by SAM (making use of the destination's
signing key to verify the source) and includes replay prevention.
After establishing a SAM session with STYLE=DATAGRAM, the client can
send datagrams through SAM's UDP port (7655).
The first line of a datagram sent through this port has to be in the
following format :
3.0 {$nickname} {$base64_public_destination_key}
* 3.0 is the version of SAM
* {$nickname} is the id of the DGRAM session that will be used
* {$base64_public_destination_key} is the destination of the
datagram
* this line is '\n' terminated.
The first line will be discarded by SAM before sending the remaining
of the message to the specified destination.
----------------------------------------------------------------------
SAM repliable datagrams : receiving a datagram
----------------------------------------------------------------------
Received datagrams are written by SAM on the socket from which the
datagram session was opened, unless specified otherwise by the CREATE
command.
When a datagram arrives, the bridge delivers it to the client via the
message :
<- DATAGRAM RECEIVED
DESTINATION=$base64key
SIZE=$numBytes\n[$numBytes of data]
The SAM bridge never exposes to the client the authentication headers
or other fields, merely the data that the sender provided. This
continues until the session is closed (by the client dropping the
connection).
----------------------------------------------------------------------
SAM repliable datagrams : forwarding datagrams
----------------------------------------------------------------------
When creating a datagram session, the client can ask SAM to forward
incoming messages to a specified ip:port. It does so by issuing the
CREATE command with PORT and HOST options :
-> SESSION CREATE
STYLE=DATAGRAM
ID={$nickname}
DESTINATION={$private_destination_key,TRANSIENT}
PORT={$port}
[HOST={$host}]
[option=value]*
* {$host} is the hostname or IP address of the datagram server to
which SAM will forward datagrams. If not given, SAM takes the
IP of the socket that issued the forward command.
* {$port} is the port number of the datagram server to which SAM
will forward datagrams.
When a datagram arrives, the bridge sends to the specified host:port
a message containing the following data :
${sender_base64_destination_key}\n{$datagram_payload}
----------------------------------------------------------------------
SAM anonymous datagrams
----------------------------------------------------------------------
Squeezing the most out of I2P's bandwidth, SAM allows clients to send
and receive anonymous datagrams, leaving authentication and reply
information up to the client themselves. These datagrams are
unreliable and unordered, and may be up to 32KB in size.
After establishing a SAM session with STYLE=RAW, the client can
send anonymous datagrams throug the SAM bridge exactly the same way
he sends non anonymous datagrams.
Both ways of receiving datagrams are also available for anonymous
datagrams.
When anonymous datagrams are to be written to the socket that created
the session,the bridge delivers it to the client via:
<- RAW RECEIVED
SIZE=$numBytes\n[$numBytes of data]
When anonymous datagrams are to be forwarded to some host:port,
the bridge sends to the specified host:port a message containing
the following data :
{$datagram_payload}
----------------------------------------------------------------------
SAM utility functionality
----------------------------------------------------------------------
The following message can be used by the client to query the SAM
bridge for name resolution:
NAMING LOOKUP
NAME=$name
which is answered by
NAMING REPLY
RESULT=$result
NAME=$name
[VALUE=$base64key]
[MESSAGE=$message]
The RESULT value may be one of:
OK
INVALID_KEY
KEY_NOT_FOUND
If NAME=ME, then the reply will contain the base64key used by the
current session (useful if you're using a TRANSIENT one). If $result
is not OK, MESSAGE may convey a descriptive message, such as "bad
format", etc.
Public and private base64 keys can be generated using the following
message:
DEST GENERATE
which is answered by
DEST REPLY
PUB=$pubkey
PRIV=$privkey
----------------------------------------------------------------------
RESULT values
----------------------------------------------------------------------
These are the values that can be carried by the RESULT field, with
their meaning:
OK Operation completed succesfully
CANT_REACH_PEER The peer exists, but cannot be reached
DUPLICATED_DEST The specified Destination is already in use
I2P_ERROR A generic I2P error (e.g. I2CP disconnection, etc.)
INVALID_KEY The specified key is not valid (bad format, etc.)
KEY_NOT_FOUND The naming system can't resolve the given name
PEER_NOT_FOUND The peer cannot be found on the network
TIMEOUT Timeout while waiting for an event (e.g. peer answer)
----------------------------------------------------------------------
Tunnel Pool Options
----------------------------------------------------------------------
These options can be passed in as name=value pairs at the end of a
SAM SESSION CREATE line.
inbound.nickname - Name shows up in I2P router console.
inbound.quantity - Number of tunnels, default 2.
inbound.backupQuantity - Number of backup tunnels, default 0.
inbound.rebuildPeriod - Obsolete - ignored - the router controls rebuilding
inbound.duration - Tunnels last X ms, default 10*60*1000.
(change not recommended, will break anonymmity
if it works at all)
inbound.length - Depth of tunnels, default 2.
inbound.lengthVariance - If negative, randomly skews from
(length - variance) to
(length + variance). If positive, from
length to (length + var), inclusive.
Default -1.
inbound.allowZeroHop - Zero hop allowed? Default "true".
outbound.* - Same properties as inbound.
i2p.streaming.connectDelay - If 0, connect ASAP. If positive, wait
until X ms have passed or output stream
is flushed or buffer fills. Default 0.
i2p.streaming.maxWindowSize - Max window size, default 64.
----------------------------------------------------------------------
Client library implementations:
----------------------------------------------------------------------
C/C++: libSAM: http://www.innographx.com/mpc/libsam/ or i2p/sam/c/
Python: Python/I2P: http://dev.i2p.net/contrib/apps/sam/python/index.html
Others: See apps/sam/ in I2P CVS.

View File

@ -4,6 +4,7 @@
<target name="build" depends="builddep, jar" />
<target name="builddep">
<ant dir="../../ministreaming/java/" target="build" />
<ant dir="../../streaming/java/" target="build" />
<!-- ministreaming will build core -->
</target>
<condition property="depend.available">
@ -18,6 +19,7 @@
<classpath>
<pathelement location="../../../core/java/build/obj" />
<pathelement location="../../ministreaming/java/build/obj" />
<pathelement location="../../streaming/java/build/obj" />
</classpath>
</depend>
</target>
@ -28,20 +30,20 @@
srcdir="./src"
debug="true" deprecation="on" source="1.5" target="1.5"
destdir="./build/obj"
classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" />
classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar:../../streaming/java/build/streaming.jar" />
</target>
<target name="compileTest" depends="compile">
<javac
srcdir="./test"
debug="true" deprecation="on" source="1.5" target="1.5"
destdir="./build/obj"
classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" />
classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar:../../streaming/java/build/streaming.jar" />
</target>
<target name="jar" depends="compile">
<jar destfile="./build/sam.jar" basedir="./build/obj" includes="**/*.class">
<manifest>
<attribute name="Main-Class" value="net.i2p.sam.SAMBridge" />
<attribute name="Class-Path" value="i2p.jar mstreaming.jar" />
<attribute name="Class-Path" value="i2p.jar mstreaming.jar streaming.jar" />
</manifest>
</jar>
</target>
@ -52,7 +54,7 @@
<mkdir dir="./build" />
<mkdir dir="./build/javadoc" />
<javadoc
sourcepath="./src:../../../core/java/src:../../ministreaming/java/src" destdir="./build/javadoc"
sourcepath="./src:../../../core/java/src:../../ministreaming/java/src:../../streaming/java/src" destdir="./build/javadoc"
packagenames="*"
use="true"
splitindex="true"
@ -64,9 +66,11 @@
<target name="cleandep" depends="clean">
<!-- ministreaming will clean core -->
<ant dir="../../ministreaming/java/" target="distclean" />
<ant dir="../../streaming/java/" target="distclean" />
</target>
<target name="distclean" depends="clean">
<!-- ministreaming will clean core -->
<ant dir="../../ministreaming/java/" target="distclean" />
<ant dir="../../streaming/java/" target="distclean" />
</target>
</project>

View File

@ -14,9 +14,10 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -34,7 +35,7 @@ import net.i2p.util.Log;
*/
public class SAMBridge implements Runnable {
private final static Log _log = new Log(SAMBridge.class);
private ServerSocket serverSocket;
private ServerSocketChannel serverSocket;
private Properties i2cpProps;
/**
* filename in which the name to private key mapping should
@ -45,12 +46,23 @@ public class SAMBridge implements Runnable {
* app designated destination name to the base64 of the I2P formatted
* destination keys (Destination+PrivateKey+SigningPrivateKey)
*/
private Map nameToPrivKeys;
private Map<String,String> nameToPrivKeys;
private boolean acceptConnections = true;
private static final int SAM_LISTENPORT = 7656;
public static final String DEFAULT_SAM_KEYFILE = "sam.keys";
public static final String PROP_TCP_HOST = "sam.tcp.host";
public static final String PROP_TCP_PORT = "sam.tcp.port";
protected static final String DEFAULT_TCP_HOST = "0.0.0.0";
protected static final String DEFAULT_TCP_PORT = "7656";
public static final String PROP_DATAGRAM_HOST = "sam.udp.host";
public static final String PROP_DATAGRAM_PORT = "sam.udp.port";
protected static final String DEFAULT_DATAGRAM_HOST = "0.0.0.0";
protected static final String DEFAULT_DATAGRAM_PORT = "7655";
private SAMBridge() {}
@ -64,16 +76,18 @@ public class SAMBridge implements Runnable {
*/
public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) {
persistFilename = persistFile;
nameToPrivKeys = new HashMap(8);
nameToPrivKeys = new HashMap<String,String>(8);
loadKeys();
try {
if ( (listenHost != null) && !("0.0.0.0".equals(listenHost)) ) {
serverSocket = new ServerSocket(listenPort, 0, InetAddress.getByName(listenHost));
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(listenHost, listenPort));
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM bridge listening on "
+ listenHost + ":" + listenPort);
} else {
serverSocket = new ServerSocket(listenPort);
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(listenPort));
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM bridge listening on 0.0.0.0:" + listenPort);
}
@ -191,36 +205,50 @@ public class SAMBridge implements Runnable {
}
}
static class HelpRequested extends Exception {static final long serialVersionUID=0x1;}
/**
* Usage:
* <pre>SAMBridge [[listenHost ]listenPort[ name=val]*]</pre>
*
* <pre>SAMBridge [ keyfile [listenHost ] listenPort [ name=val ]* ]</pre>
* or:
* <pre>SAMBridge [ name=val ]* </pre>
*
* name=val options are passed to the I2CP code to build a session,
* allowing the bridge to specify an alternate I2CP host and port, tunnel
* depth, etc.
* @param args [[listenHost ]listenPort[ name=val]*]
* @param args [ keyfile [ listenHost ] listenPort [ name=val ]* ]
*/
public static void main(String args[]) {
String keyfile = DEFAULT_SAM_KEYFILE;
int port = SAM_LISTENPORT;
String host = "0.0.0.0";
String host = DEFAULT_TCP_HOST;
Properties opts = null;
if (args.length > 0) {
keyfile = args[0];
int portIndex = 1;
try {
port = Integer.parseInt(args[portIndex]);
} catch (NumberFormatException nfe) {
host = args[1];
portIndex++;
try {
port = Integer.parseInt(args[portIndex]);
} catch (NumberFormatException nfe1) {
usage();
return;
}
}
opts = parseOptions(args, portIndex+1);
try {
opts = parseOptions(args, 0);
keyfile = args[0];
int portIndex = 1;
try {
if (args.length>portIndex) port = Integer.parseInt(args[portIndex]);
} catch (NumberFormatException nfe) {
host = args[portIndex];
portIndex++;
try {
if (args.length>portIndex) port = Integer.parseInt(args[portIndex]);
} catch (NumberFormatException nfe1) {
try {
port = Integer.parseInt(opts.getProperty(SAMBridge.PROP_TCP_PORT, SAMBridge.DEFAULT_TCP_PORT));
host = opts.getProperty(SAMBridge.PROP_TCP_HOST, SAMBridge.DEFAULT_TCP_HOST);
} catch (NumberFormatException e) {
usage();
return;
}
}
}
} catch (HelpRequested e) {
usage();
return;
}
}
SAMBridge bridge = new SAMBridge(host, port, opts, keyfile);
I2PAppThread t = new I2PAppThread(bridge, "SAMListener");
@ -236,10 +264,11 @@ public class SAMBridge implements Runnable {
t.start();
}
private static Properties parseOptions(String args[], int startArgs) {
private static Properties parseOptions(String args[], int startArgs) throws HelpRequested {
Properties props = new Properties();
// skip over first few options
for (int i = startArgs; i < args.length; i++) {
if (args[i].equals("-h")) throw new HelpRequested();
int eq = args[i].indexOf('=');
if (eq <= 0) continue;
if (eq >= args[i].length()-1) continue;
@ -255,50 +284,76 @@ public class SAMBridge implements Runnable {
private static void usage() {
System.err.println("Usage: SAMBridge [keyfile [listenHost] listenPortNum[ name=val]*]");
System.err.println("or:");
System.err.println(" SAMBridge [ name=val ]*");
System.err.println(" keyfile: location to persist private keys (default sam.keys)");
System.err.println(" listenHost: interface to listen on (0.0.0.0 for all interfaces)");
System.err.println(" listenPort: port to listen for SAM connections on (default 7656)");
System.err.println(" name=val: options to pass when connecting via I2CP, such as ");
System.err.println(" i2cp.host=localhost and i2cp.port=7654");
System.err.println("");
System.err.println("Host and ports of the SAM bridge can be specified with the alternate");
System.err.println("form by specifying options "+SAMBridge.PROP_TCP_HOST+" and/or "+
SAMBridge.PROP_TCP_PORT);
System.err.println("");
System.err.println("Options "+SAMBridge.PROP_DATAGRAM_HOST+" and "+SAMBridge.PROP_DATAGRAM_PORT+
" specify the listening ip");
System.err.println("range and the port of SAM datagram server. This server is");
System.err.println("only launched after a client creates the first SAM datagram");
System.err.println("or raw session, after a handshake with SAM version >= 3.0.");
System.err.println("");
System.err.println("The option loglevel=[DEBUG|WARN|ERROR|CRIT] can be used");
System.err.println("for tuning the log verbosity.\n");
}
public void run() {
if (serverSocket == null) return;
try {
while (acceptConnections) {
Socket s = serverSocket.accept();
SocketChannel s = serverSocket.accept();
if (_log.shouldLog(Log.DEBUG))
_log.debug("New connection from "
+ s.getInetAddress().toString() + ":"
+ s.getPort());
+ s.socket().getInetAddress().toString() + ":"
+ s.socket().getPort());
try {
SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps);
if (handler == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM handler has not been instantiated");
class HelloHandler implements Runnable {
SocketChannel s ;
SAMBridge parent ;
HelloHandler(SocketChannel s, SAMBridge parent) {
this.s = s ;
this.parent = parent ;
}
public void run() {
try {
s.close();
} catch (IOException e) {}
continue;
}
handler.setBridge(this);
handler.startHandling();
} catch (SAMException e) {
if (_log.shouldLog(Log.ERROR))
_log.error("SAM error: " + e.getMessage(), e);
try {
String reply = "HELLO REPLY RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n";
s.getOutputStream().write(reply.getBytes("ISO-8859-1"));
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("SAM Error sending error reply", ioe);
}
try { s.close(); } catch (IOException ioe) {}
} catch (Exception ee) {
try { s.close(); } catch (IOException ioe) {}
_log.log(Log.CRIT, "Unexpected error handling SAM connection", ee);
}
SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps);
if (handler == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM handler has not been instantiated");
try {
s.close();
} catch (IOException e) {}
return;
}
handler.setBridge(parent);
handler.startHandling();
} catch (SAMException e) {
if (_log.shouldLog(Log.ERROR))
_log.error("SAM error: " + e.getMessage(), e);
try {
String reply = "HELLO REPLY RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n";
s.write(ByteBuffer.wrap(reply.getBytes("ISO-8859-1")));
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("SAM Error sending error reply", ioe);
}
try { s.close(); } catch (IOException ioe) {}
} catch (Exception ee) {
try { s.close(); } catch (IOException ioe) {}
_log.log(Log.CRIT, "Unexpected error handling SAM connection", ee);
}
}
}
new I2PAppThread(new HelloHandler(s,this), "HelloHandler").start();
}
} catch (Exception e) {
if (_log.shouldLog(Log.ERROR))

View File

@ -30,7 +30,7 @@ public class SAMDatagramSession extends SAMMessageSession {
private final static Log _log = new Log(SAMDatagramSession.class);
public static int DGRAM_SIZE_MAX = 31*1024;
private SAMDatagramReceiver recv = null;
protected SAMDatagramReceiver recv = null;
private I2PDatagramMaker dgramMaker;
private I2PDatagramDissector dgramDissector = new I2PDatagramDissector();
@ -84,9 +84,10 @@ public class SAMDatagramSession extends SAMMessageSession {
public boolean sendBytes(String dest, byte[] data) throws DataFormatException {
if (data.length > DGRAM_SIZE_MAX)
throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
byte[] dgram = dgramMaker.makeI2PDatagram(data);
byte[] dgram ;
synchronized (dgramMaker) {
dgram = dgramMaker.makeI2PDatagram(data);
}
return sendBytesThroughMessageSession(dest, dgram);
}

View File

@ -15,11 +15,13 @@ package net.i2p.sam;
*/
public class SAMException extends Exception {
static final long serialVersionUID = 1 ;
public SAMException() {
super();
super();
}
public SAMException(String s) {
super(s);
super(s);
}
}

View File

@ -9,9 +9,8 @@ package net.i2p.sam;
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Properties;
import net.i2p.util.I2PAppThread;
@ -32,8 +31,7 @@ public abstract class SAMHandler implements Runnable {
protected SAMBridge bridge = null;
private Object socketWLock = new Object(); // Guards writings on socket
private Socket socket = null;
private OutputStream socketOS = null; // Stream associated to socket
protected SocketChannel socket = null;
protected int verMajor = 0;
protected int verMinor = 0;
@ -53,10 +51,9 @@ public abstract class SAMHandler implements Runnable {
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
* @throws IOException
*/
protected SAMHandler(Socket s,
protected SAMHandler(SocketChannel s,
int verMajor, int verMinor, Properties i2cpProps) throws IOException {
socket = s;
socketOS = socket.getOutputStream();
this.verMajor = verMajor;
this.verMinor = verMinor;
@ -86,8 +83,8 @@ public abstract class SAMHandler implements Runnable {
* @return input stream
* @throws IOException
*/
protected final InputStream getClientSocketInputStream() throws IOException {
return socket.getInputStream();
protected final SocketChannel getClientSocket() {
return socket ;
}
/**
@ -98,13 +95,17 @@ public abstract class SAMHandler implements Runnable {
* @param data A byte array to be written
* @throws IOException
*/
protected final void writeBytes(byte[] data) throws IOException {
protected final void writeBytes(ByteBuffer data) throws IOException {
synchronized (socketWLock) {
socketOS.write(data);
socketOS.flush();
writeBytes(data, socket);
}
}
static public void writeBytes(ByteBuffer data, SocketChannel out) throws IOException {
while (data.hasRemaining()) out.write(data);
out.socket().getOutputStream().flush();
}
/**
* If you're crazy enough to write to the raw socket, grab the write lock
* with getWriteLock(), synchronize against it, and write to the getOut()
@ -112,7 +113,6 @@ public abstract class SAMHandler implements Runnable {
* @return socket Write lock object
*/
protected Object getWriteLock() { return socketWLock; }
protected OutputStream getOut() { return socketOS; }
/**
* Write a string to the handler's socket. This method must
@ -121,21 +121,25 @@ public abstract class SAMHandler implements Runnable {
*
* @param str A byte array to be written
*
* @return True is the string was successfully written, false otherwise
* @return True if the string was successfully written, false otherwise
*/
protected final boolean writeString(String str) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending the client: [" + str + "]");
try {
writeBytes(str.getBytes("ISO-8859-1"));
return writeString(str, socket);
}
public static boolean writeString(String str, SocketChannel out)
{
try {
writeBytes(ByteBuffer.wrap(str.getBytes("ISO-8859-1")), out);
} catch (IOException e) {
_log.debug("Caught IOException", e);
return false;
}
return true;
return true ;
}
/**
* Close the socket connected to the SAM client.
*
@ -178,8 +182,8 @@ public abstract class SAMHandler implements Runnable {
return ("SAM handler (class: " + this.getClass().getName()
+ "; SAM version: " + verMajor + "." + verMinor
+ "; client: "
+ this.socket.getInetAddress().toString() + ":"
+ this.socket.getPort() + ")");
+ this.socket.socket().getInetAddress().toString() + ":"
+ this.socket.socket().getPort() + ")");
}
public final void run() {

View File

@ -9,9 +9,9 @@ package net.i2p.sam;
*/
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.StringTokenizer;
@ -34,17 +34,17 @@ public class SAMHandlerFactory {
* @throws SAMException if the connection handshake (HELLO message) was malformed
* @return A SAM protocol handler, or null if the client closed before the handshake
*/
public static SAMHandler createSAMHandler(Socket s, Properties i2cpProps) throws SAMException {
public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException {
String line;
StringTokenizer tok;
try {
line = DataHelper.readLine(s.getInputStream());
line = DataHelper.readLine(s.socket().getInputStream());
if (line == null) {
_log.debug("Connection closed by client");
return null;
}
tok = new StringTokenizer(line, " ");
tok = new StringTokenizer(line.trim(), " ");
} catch (IOException e) {
throw new SAMException("Error reading from socket: "
+ e.getMessage());
@ -84,14 +84,15 @@ public class SAMHandlerFactory {
}
String ver = chooseBestVersion(minVer, maxVer);
if (ver == null)
throw new SAMException("No version specified");
// Let's answer positively
try {
OutputStream out = s.getOutputStream();
out.write(("HELLO REPLY RESULT=OK VERSION="
+ ver + "\n").getBytes("ISO-8859-1"));
if (ver == null) {
s.write(ByteBuffer.wrap(("HELLO REPLY RESULT=NOVERSION\n").getBytes("ISO-8859-1")));
return null ;
}
// Let's answer positively
s.write(ByteBuffer.wrap(("HELLO REPLY RESULT=OK VERSION="
+ ver + "\n").getBytes("ISO-8859-1")));
} catch (UnsupportedEncodingException e) {
_log.error("Caught UnsupportedEncodingException ("
+ e.getMessage() + ")");
@ -115,6 +116,9 @@ public class SAMHandlerFactory {
case 2:
handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps);
break;
case 3:
handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps);
break;
default:
_log.error("BUG! Trying to initialize the wrong SAM version!");
throw new SAMException("BUG! (in handler instantiation)");
@ -128,6 +132,7 @@ public class SAMHandlerFactory {
/* Return the best version we can use, or null on failure */
private static String chooseBestVersion(String minVer, String maxVer) {
int minMajor = getMajor(minVer), minMinor = getMinor(minVer);
int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer);
@ -143,6 +148,8 @@ public class SAMHandlerFactory {
float fmaxVer = (float) maxMajor + (float) maxMinor / 10 ;
if ( ( fminVer <= 3.0 ) && ( fmaxVer >= 3.0 ) ) return "3.0" ;
if ( ( fminVer <= 2.0 ) && ( fmaxVer >= 2.0 ) ) return "2.0" ;
if ( ( fminVer <= 1.0 ) && ( fmaxVer >= 1.0 ) ) return "1.0" ;

View File

@ -15,7 +15,8 @@ package net.i2p.sam;
* @author human
*/
public class SAMInvalidDirectionException extends Exception {
static final long serialVersionUID = 1 ;
public SAMInvalidDirectionException() {
super();
}

View File

@ -109,8 +109,7 @@ public abstract class SAMMessageSession {
* @throws DataFormatException
*/
protected boolean sendBytesThroughMessageSession(String dest, byte[] data) throws DataFormatException {
Destination d = new Destination();
d.fromBase64(dest);
Destination d = SAMUtils.getDest(dest);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Sending " + data.length + " bytes to " + dest);

View File

@ -26,7 +26,7 @@ public class SAMRawSession extends SAMMessageSession {
private final static Log _log = new Log(SAMRawSession.class);
public static final int RAW_SIZE_MAX = 32*1024;
private SAMRawReceiver recv = null;
protected SAMRawReceiver recv = null;
/**
* Create a new SAM RAW session.
*

View File

@ -9,6 +9,7 @@ package net.i2p.sam;
*/
import java.io.IOException;
import java.nio.ByteBuffer;
import net.i2p.data.Destination;
@ -60,7 +61,7 @@ public interface SAMStreamReceiver {
* @param len Number of bytes in data
* @throws IOException
*/
public void receiveStreamBytes(int id, byte data[], int len) throws IOException;
public void receiveStreamBytes(int id, ByteBuffer data) throws IOException;
/**
* Notify that a connection has been closed

View File

@ -13,6 +13,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
@ -51,15 +53,15 @@ public class SAMStreamSession {
protected SAMStreamReceiver recv = null;
private SAMStreamSessionServer server = null;
protected SAMStreamSessionServer server = null;
protected I2PSocketManager socketMgr = null;
private Object handlersMapLock = new Object();
/** stream id (Long) to SAMStreamSessionSocketReader */
private HashMap handlersMap = new HashMap();
private HashMap<Integer,SAMStreamSessionSocketReader> handlersMap = new HashMap<Integer,SAMStreamSessionSocketReader>();
/** stream id (Long) to StreamSender */
private HashMap sendersMap = new HashMap();
private HashMap<Integer,StreamSender> sendersMap = new HashMap<Integer,StreamSender>();
private Object idLock = new Object();
private int lastNegativeId = 0;
@ -76,6 +78,10 @@ public class SAMStreamSession {
public static String PROP_FORCE_FLUSH = "sam.forceFlush";
public static String DEFAULT_FORCE_FLUSH = "false";
public SAMStreamSession() {
}
/**
* Create a new SAM STREAM session.
*
@ -166,7 +172,7 @@ public class SAMStreamSession {
}
}
private class DisconnectListener implements I2PSocketManager.DisconnectListener {
protected class DisconnectListener implements I2PSocketManager.DisconnectListener {
public void sessionDisconnected() {
close();
}
@ -572,19 +578,20 @@ public class SAMStreamSession {
_log.debug("run() called for socket reader " + id);
int read = -1;
byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE);
try {
InputStream in = i2pSocket.getInputStream();
while (stillRunning) {
read = in.read(data);
data.clear();
read = Channels.newChannel(in).read(data);
if (read == -1) {
_log.debug("Handler " + id + ": connection closed");
break;
}
recv.receiveStreamBytes(id, data, read);
data.flip();
recv.receiveStreamBytes(id, data);
}
} catch (IOException e) {
_log.debug("Caught IOException", e);
@ -650,7 +657,7 @@ public class SAMStreamSession {
protected class v1StreamSender extends StreamSender
{
private List _data;
private List<ByteArray> _data;
private int _id;
private ByteCache _cache;
private OutputStream _out = null;
@ -660,7 +667,7 @@ public class SAMStreamSession {
public v1StreamSender ( I2PSocket s, int id ) throws IOException {
super ( s, id );
_data = new ArrayList(1);
_data = new ArrayList<ByteArray>(1);
_id = id;
_cache = ByteCache.getInstance(4, 32*1024);
_out = s.getOutputStream();

View File

@ -8,6 +8,7 @@ package net.i2p.sam;
*
*/
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Enumeration;
@ -19,8 +20,11 @@ import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.naming.NamingService;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.data.PrivateKey;
import net.i2p.data.SigningPrivateKey;
import net.i2p.util.Log;
/**
@ -73,6 +77,22 @@ public class SAMUtils {
return false;
}
}
public static class InvalidDestination extends Exception {
static final long serialVersionUID = 0x1 ;
}
public static void checkPrivateDestination(String dest) throws InvalidDestination {
ByteArrayInputStream destKeyStream = new ByteArrayInputStream(Base64.decode(dest));
try {
new Destination().readBytes(destKeyStream);
new PrivateKey().readBytes(destKeyStream);
new SigningPrivateKey().readBytes(destKeyStream);
} catch (Exception e) {
throw new InvalidDestination();
}
}
/**
* Resolved the specified hostname.
@ -101,6 +121,27 @@ public class SAMUtils {
return dest;
}
/**
* Resolve the destination from a key or a hostname
*
* @param s Hostname or key to be resolved
*
* @return the Destination for the specified hostname, or null if not found
*/
public static Destination getDest(String s) throws DataFormatException
{
Destination d = new Destination() ;
try {
d.fromBase64(s);
} catch (DataFormatException e) {
d = lookupHost(s, null);
if ( d==null ) {
throw e ;
}
}
return d ;
}
/**
* Parse SAM parameters, and put them into a Propetries object
*

View File

@ -12,12 +12,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.StringTokenizer;
@ -40,14 +39,14 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
private final static Log _log = new Log(SAMv1Handler.class);
private final static int IN_BUFSIZE = 2048;
protected SAMRawSession rawSession = null;
protected SAMDatagramSession datagramSession = null;
protected SAMStreamSession streamSession = null;
protected SAMDatagramSession getDatagramSession() {return datagramSession ;}
protected SAMRawSession getRawSession() {return rawSession ;}
private SAMRawSession rawSession = null;
private SAMDatagramSession datagramSession = null;
protected SAMStreamSession streamSession = null;
private long _id;
private static volatile long __id = 0;
protected long _id;
protected static volatile long __id = 0;
/**
* Create a new SAM version 1 handler. This constructor expects
@ -60,7 +59,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
* @throws SAMException
* @throws IOException
*/
public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException, IOException {
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException {
this(s, verMajor, verMinor, new Properties());
}
/**
@ -75,7 +74,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
* @throws SAMException
* @throws IOException
*/
public SAMv1Handler(Socket s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
super(s, verMajor, verMinor, i2cpProps);
_id = ++__id;
_log.debug("SAM version 1 handler instantiated");
@ -101,16 +100,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
_log.debug("SAM handling started");
try {
InputStream in = getClientSocketInputStream();
int b = -1;
while (true) {
if (shouldStop()) {
_log.debug("Stop request found");
break;
}
msg = DataHelper.readLine(in);
msg = DataHelper.readLine(getClientSocket().socket().getInputStream()).trim();
if (msg == null) {
_log.debug("Connection closed by client");
break;
@ -175,11 +171,11 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
if (rawSession != null) {
rawSession.close();
if (getRawSession() != null) {
getRawSession().close();
}
if (datagramSession != null) {
datagramSession.close();
if (getDatagramSession() != null) {
getDatagramSession().close();
}
if (streamSession != null) {
streamSession.close();
@ -188,13 +184,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a SESSION message */
private boolean execSessionMessage(String opcode, Properties props) {
protected boolean execSessionMessage(String opcode, Properties props) {
String dest = "BUG!";
try{
if (opcode.equals("CREATE")) {
if ((rawSession != null) || (datagramSession != null)
if ((getRawSession() != null) || (getDatagramSession() != null)
|| (streamSession != null)) {
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
@ -293,7 +289,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a DEST message*/
private boolean execDestMessage(String opcode, Properties props) {
protected boolean execDestMessage(String opcode, Properties props) {
if (opcode.equals("GENERATE")) {
if (props.size() > 0) {
@ -318,7 +314,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a NAMING message */
private boolean execNamingMessage(String opcode, Properties props) {
protected boolean execNamingMessage(String opcode, Properties props) {
if (opcode.equals("LOOKUP")) {
if (props == null) {
_log.debug("No parameters specified in NAMING LOOKUP message");
@ -331,20 +327,23 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
return false;
}
Destination dest;
Destination dest = null ;
if (name.equals("ME")) {
if (rawSession != null) {
dest = rawSession.getDestination();
if (getRawSession() != null) {
dest = getRawSession().getDestination();
} else if (streamSession != null) {
dest = streamSession.getDestination();
} else if (datagramSession != null) {
dest = datagramSession.getDestination();
} else if (getDatagramSession() != null) {
dest = getDatagramSession().getDestination();
} else {
_log.debug("Lookup for SESSION destination, but session is null");
return false;
}
} else {
dest = SAMUtils.lookupHost(name, null);
try {
dest = SAMUtils.getDest(name);
} catch (DataFormatException e) {
}
}
if (dest == null) {
@ -364,8 +363,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
/* Parse and execute a DATAGRAM message */
private boolean execDatagramMessage(String opcode, Properties props) {
if (datagramSession == null) {
protected boolean execDatagramMessage(String opcode, Properties props) {
if (getDatagramSession() == null) {
_log.error("DATAGRAM message received, but no DATAGRAM session exists");
return false;
}
@ -403,7 +402,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
DataInputStream in = new DataInputStream(getClientSocketInputStream());
DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
byte[] data = new byte[size];
in.readFully(data);
@ -435,8 +434,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a RAW message */
private boolean execRawMessage(String opcode, Properties props) {
if (rawSession == null) {
protected boolean execRawMessage(String opcode, Properties props) {
if (getRawSession() == null) {
_log.error("RAW message received, but no RAW session exists");
return false;
}
@ -474,12 +473,12 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
DataInputStream in = new DataInputStream(getClientSocketInputStream());
DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
byte[] data = new byte[size];
in.readFully(data);
if (!rawSession.sendBytes(dest, data)) {
if (!getRawSession().sendBytes(dest, data)) {
_log.error("RAW SEND failed");
return true;
}
@ -567,7 +566,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) {
if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
if (_log.shouldLog(Log.WARN))
_log.warn("STREAM SEND [" + size + "] failed");
boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
@ -691,7 +690,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
// SAMRawReceiver implementation
public void receiveRawBytes(byte data[]) throws IOException {
if (rawSession == null) {
if (getRawSession() == null) {
_log.error("BUG! Received raw bytes, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
}
@ -701,17 +700,18 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
String msgText = "RAW RECEIVED SIZE=" + data.length + "\n";
msg.write(msgText.getBytes("ISO-8859-1"));
msg.write(data);
msg.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);
writeBytes(msg.toByteArray());
writeBytes(ByteBuffer.wrap(msg.toByteArray()));
}
public void stopRawReceiving() {
_log.debug("stopRawReceiving() invoked");
if (rawSession == null) {
if (getRawSession() == null) {
_log.error("BUG! Got raw receiving stop, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
}
@ -726,7 +726,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
// SAMDatagramReceiver implementation
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
if (datagramSession == null) {
if (getDatagramSession() == null) {
_log.error("BUG! Received datagram bytes, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
}
@ -740,14 +740,14 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);
msg.write(data);
writeBytes(msg.toByteArray());
msg.flush();
writeBytes(ByteBuffer.wrap(msg.toByteArray()));
}
public void stopDatagramReceiving() {
_log.debug("stopDatagramReceiving() invoked");
if (datagramSession == null) {
if (getDatagramSession() == null) {
_log.error("BUG! Got datagram receiving stop, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
}
@ -830,29 +830,23 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
}
public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
if (streamSession == null) {
_log.error("Received stream bytes, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + len + "\n";
String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n";
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);
byte prefix[] = msgText.getBytes("ISO-8859-1");
ByteBuffer prefix = ByteBuffer.wrap(msgText.getBytes("ISO-8859-1"));
// dont waste so much memory
//ByteArrayOutputStream msg = new ByteArrayOutputStream();
//msg.write(msgText.getBytes("ISO-8859-1"));
//msg.write(data, 0, len);
// writeBytes(msg.toByteArray());
Object writeLock = getWriteLock();
OutputStream out = getOut();
synchronized (writeLock) {
out.write(prefix);
out.write(data, 0, len);
out.flush();
while (prefix.hasRemaining()) socket.write(prefix);
while (data.hasRemaining()) socket.write(data);
socket.socket().getOutputStream().flush();
}
}

View File

@ -9,7 +9,7 @@ package net.i2p.sam;
*/
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Properties;
import net.i2p.data.DataFormatException;
@ -36,7 +36,7 @@ public class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDat
* @param verMajor SAM major version to manage (should be 2)
* @param verMinor SAM minor version to manage
*/
public SAMv2Handler ( Socket s, int verMajor, int verMinor ) throws SAMException, IOException
public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
{
this ( s, verMajor, verMinor, new Properties() );
}
@ -52,7 +52,7 @@ public class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDat
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
*/
public SAMv2Handler ( Socket s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
{
super ( s, verMajor, verMinor, i2cpProps );
}

View File

@ -12,6 +12,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.ByteBuffer;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
@ -140,9 +142,6 @@ public class SAMv2StreamSession extends SAMStreamSession
public class StreamConnector implements Runnable
{
private Object runningLock = new Object();
private boolean stillRunning = true;
private int id;
private Destination dest ;
private I2PSocketOptions opts ;
@ -245,7 +244,7 @@ public class SAMv2StreamSession extends SAMStreamSession
protected class v2StreamSender extends StreamSender
{
private List _data;
private List<ByteArray> _data;
private int _dataSize;
private int _id;
private ByteCache _cache;
@ -257,7 +256,7 @@ public class SAMv2StreamSession extends SAMStreamSession
public v2StreamSender ( I2PSocket s, int id ) throws IOException
{
super ( s, id );
_data = new ArrayList ( 1 );
_data = new ArrayList<ByteArray> ( 1 );
_dataSize = 0;
_id = id;
_cache = ByteCache.getInstance ( 10, 32 * 1024 );
@ -511,7 +510,7 @@ public class SAMv2StreamSession extends SAMStreamSession
_log.debug ( "run() called for socket reader " + id );
int read = -1;
byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE);
try
{
@ -533,7 +532,8 @@ public class SAMv2StreamSession extends SAMStreamSession
break ;
}
read = in.read ( data );
data.clear();
read = Channels.newChannel(in).read ( data );
if ( read == -1 )
{
@ -542,8 +542,8 @@ public class SAMv2StreamSession extends SAMStreamSession
}
totalReceived += read ;
recv.receiveStreamBytes ( id, data, read );
data.flip();
recv.receiveStreamBytes ( id, data );
}
}
catch ( IOException e )

View File

@ -0,0 +1,90 @@
/**
* @author MKVore
*
*/
package net.i2p.sam;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Properties;
import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import java.net.InetSocketAddress;
import java.net.SocketAddress ;
import java.nio.ByteBuffer;
public class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Session, SAMDatagramReceiver {
private final static Log _log = new Log ( SAMv3DatagramSession.class );
SAMv3Handler handler = null ;
SAMv3Handler.DatagramServer server = null ;
String nick = null ;
SocketAddress clientAddress = null ;
public String getNick() { return nick; }
/**
* @param nick nickname of the session
* @param server DatagramServer used for communication with the client
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
*/
public SAMv3DatagramSession(String nick)
throws IOException, DataFormatException, I2PSessionException {
super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
SAMv3Handler.sSessionsHash.get(nick).getProps(),
null
);
this.nick = nick ;
this.recv = this ;
this.server = SAMv3Handler.DatagramServer.getInstance() ;
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null ) throw new InterruptedIOException() ;
this.handler = rec.getHandler();
Properties props = rec.getProps();
String portStr = props.getProperty("PORT") ;
if ( portStr==null ) {
_log.debug("receiver port not specified. Current socket will be used.");
}
else {
int port = Integer.parseInt(portStr);
String host = props.getProperty("HOST");
if ( host==null ) {
_log.debug("no host specified. Take from the client socket");
host = rec.getHandler().getClientIP();
}
this.clientAddress = new InetSocketAddress(host,port);
}
}
public void receiveDatagramBytes(Destination sender, byte[] data) throws IOException {
if (this.clientAddress==null) {
this.handler.receiveDatagramBytes(sender, data);
} else {
String msg = sender.toBase64()+"\n";
ByteBuffer msgBuf = ByteBuffer.allocate(msg.length()+data.length);
msgBuf.put(msg.getBytes("ISO-8859-1"));
msgBuf.put(data);
msgBuf.flip();
this.server.send(this.clientAddress, msgBuf);
}
}
public void stopDatagramReceiving() {
}
}

View File

@ -0,0 +1,762 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.NoRouteToHostException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.HashMap;
import java.util.StringTokenizer;
import net.i2p.I2PException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.I2PAppThread;
/**
* Class able to handle a SAM version 3 client connection.
*
* @author mkvore
*/
public class SAMv3Handler extends SAMv1Handler
{
private final static Log _log = new Log ( SAMv3Handler.class );
protected SAMv3StreamSession streamSession = null ;
protected SAMv3RawSession rawSession = null ;
protected SAMv3DatagramSession datagramSession = null ;
protected SAMDatagramSession getDatagramSession() {
return datagramSession ;
}
protected SAMRawSession getRawSession() {
return rawSession ;
}
protected Session session = null ;
interface Session {
String getNick();
void close();
boolean sendBytes(String dest, byte[] data) throws DataFormatException;
}
/**
* Create a new SAM version 3 handler. This constructor expects
* that the SAM HELLO message has been still answered (and
* stripped) from the socket input stream.
*
* @param s Socket attached to a SAM client
* @param verMajor SAM major version to manage (should be 3)
* @param verMinor SAM minor version to manage
*/
public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
{
this ( s, verMajor, verMinor, new Properties() );
}
/**
* Create a new SAM version 3 handler. This constructor expects
* that the SAM HELLO message has been still answered (and
* stripped) from the socket input stream.
*
* @param s Socket attached to a SAM client
* @param verMajor SAM major version to manage (should be 3)
* @param verMinor SAM minor version to manage
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
*/
public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
{
super ( s, verMajor, verMinor, i2cpProps );
_log.debug("SAM version 3 handler instantiated");
}
public boolean verifVersion()
{
return (verMajor == 3 && verMinor == 0) ;
}
static public class DatagramServer {
private static DatagramServer _instance = null ;
private static DatagramChannel server = null ;
public static DatagramServer getInstance() throws IOException {
return getInstance(new Properties());
}
public static DatagramServer getInstance(Properties props) throws IOException {
if (_instance==null) {
_instance = new DatagramServer(props);
}
return _instance ;
}
public DatagramServer(Properties props) throws IOException {
if (server==null) {
server = DatagramChannel.open();
}
String host = props.getProperty(SAMBridge.PROP_DATAGRAM_HOST, SAMBridge.DEFAULT_DATAGRAM_HOST);
String portStr = props.getProperty(SAMBridge.PROP_DATAGRAM_PORT, SAMBridge.DEFAULT_DATAGRAM_PORT);
int port ;
try {
port = Integer.parseInt(portStr);
} catch (NumberFormatException e) {
port = Integer.parseInt(SAMBridge.DEFAULT_DATAGRAM_PORT);
}
server.socket().bind(new InetSocketAddress(host, port));
new I2PAppThread(new Listener(server), "DatagramListener").start();
}
public void send(SocketAddress addr, ByteBuffer msg) throws IOException {
server.send(msg, addr);
}
class Listener implements Runnable {
DatagramChannel server = null;
public Listener(DatagramChannel server)
{
this.server = server ;
}
public void run()
{
ByteBuffer inBuf = ByteBuffer.allocateDirect(SAMRawSession.RAW_SIZE_MAX+1024);
while (!Thread.interrupted())
{
inBuf.clear();
try {
server.receive(inBuf);
} catch (IOException e) {
break ;
}
inBuf.flip();
ByteBuffer outBuf = ByteBuffer.wrap(new byte[inBuf.remaining()]);
outBuf.put(inBuf);
outBuf.flip();
new I2PAppThread(new MessageDispatcher(outBuf.array()), "MessageDispatcher").start();
}
}
}
}
public static class MessageDispatcher implements Runnable
{
ByteArrayInputStream is = null ;
public MessageDispatcher(byte[] buf)
{
this.is = new java.io.ByteArrayInputStream(buf) ;
}
public void run() {
String header = null ;
String nick ;
String dest ;
String version ;
try {
header = DataHelper.readLine(is).trim();
StringTokenizer tok = new StringTokenizer(header, " ");
if (tok.countTokens() != 3) {
// This is not a correct message, for sure
_log.debug("Error in message format");
return;
}
version = tok.nextToken();
if (!"3.0".equals(version)) return ;
nick = tok.nextToken();
dest = tok.nextToken();
byte[] data = new byte[is.available()];
is.read(data);
SessionRecord rec = sSessionsHash.get(nick);
if (rec!=null) {
rec.getHandler().session.sendBytes(dest,data);
}
} catch (Exception e) {}
}
}
public class SessionRecord
{
protected String m_dest ;
protected Properties m_props ;
protected ThreadGroup m_threadgroup ;
protected SAMv3Handler m_handler ;
public SessionRecord( String dest, Properties props, SAMv3Handler handler )
{
m_dest = new String(dest) ;
m_props = new Properties() ;
m_props.putAll(props);
m_threadgroup = null ;
m_handler = handler ;
}
public SessionRecord( SessionRecord in )
{
m_dest = in.getDest();
m_props = in.getProps();
m_threadgroup = in.getThreadGroup();
m_handler = in.getHandler();
}
synchronized public String getDest()
{
return new String(m_dest) ;
}
synchronized public Properties getProps()
{
Properties p = new Properties();
p.putAll(m_props);
return m_props;
}
synchronized public SAMv3Handler getHandler()
{
return m_handler ;
}
synchronized public ThreadGroup getThreadGroup()
{
return m_threadgroup ;
}
synchronized public void createThreadGroup(String name)
{
if (m_threadgroup == null)
m_threadgroup = new ThreadGroup(name);
}
}
public static class SessionsDB
{
static final long serialVersionUID = 0x1 ;
class ExistingId extends Exception {
static final long serialVersionUID = 0x1 ;
}
class ExistingDest extends Exception {
static final long serialVersionUID = 0x1 ;
}
HashMap<String, SessionRecord> map ;
public SessionsDB() {
map = new HashMap<String, SessionRecord>() ;
}
synchronized public boolean put( String nick, SessionRecord session ) throws ExistingId, ExistingDest
{
if ( map.containsKey(nick) ) {
throw new ExistingId();
}
for ( SessionRecord r : map.values() ) {
if (r.getDest().equals(session.getDest())) {
throw new ExistingDest();
}
}
if ( !map.containsKey(nick) ) {
session.createThreadGroup("SAM session "+nick);
map.put(nick, session) ;
return true ;
}
else
return false ;
}
synchronized public boolean del( String nick )
{
SessionRecord rec = map.get(nick);
if ( rec!=null ) {
map.remove(nick);
return true ;
}
else
return false ;
}
synchronized public SessionRecord get(String nick)
{
return map.get(nick);
}
synchronized public boolean containsKey( String nick )
{
return map.containsKey(nick);
}
}
public static SessionsDB sSessionsHash = new SessionsDB() ;
public String getClientIP()
{
return this.socket.socket().getInetAddress().getHostAddress();
}
boolean stolenSocket = false ;
public void stealSocket()
{
stolenSocket = true ;
this.stopHandling();
}
public void handle() {
String msg = null;
String domain = null;
String opcode = null;
boolean canContinue = false;
StringTokenizer tok;
Properties props;
this.thread.setName("SAMv3Handler " + _id);
_log.debug("SAM handling started");
try {
InputStream in = getClientSocket().socket().getInputStream();
while (true) {
if (shouldStop()) {
_log.debug("Stop request found");
break;
}
msg = DataHelper.readLine(in).trim();
if (msg == null) {
_log.debug("Connection closed by client");
break;
}
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("New message received: [" + msg + "]");
}
if(msg.equals("")) {
_log.debug("Ignoring newline");
continue;
}
tok = new StringTokenizer(msg, " ");
if (tok.countTokens() < 2) {
// This is not a correct message, for sure
_log.debug("Error in message format");
break;
}
domain = tok.nextToken();
opcode = tok.nextToken();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Parsing (domain: \"" + domain
+ "\"; opcode: \"" + opcode + "\")");
}
props = SAMUtils.parseParams(tok);
if (domain.equals("STREAM")) {
canContinue = execStreamMessage(opcode, props);
} else if (domain.equals("SESSION")) {
if (i2cpProps != null)
props.putAll(i2cpProps); // make sure we've got the i2cp settings
canContinue = execSessionMessage(opcode, props);
} else if (domain.equals("DEST")) {
canContinue = execDestMessage(opcode, props);
} else if (domain.equals("NAMING")) {
canContinue = execNamingMessage(opcode, props);
} else {
_log.debug("Unrecognized message domain: \""
+ domain + "\"");
break;
}
if (!canContinue) {
break;
}
}
} catch (IOException e) {
_log.debug("Caught IOException ("
+ e.getMessage() + ") for message [" + msg + "]", e);
} catch (Exception e) {
_log.error("Unexpected exception for message [" + msg + "]", e);
} finally {
_log.debug("Stopping handler");
if (!this.stolenSocket)
{
try {
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
}
die();
}
}
protected void die() {
SessionRecord rec = null ;
if (session!=null) {
session.close();
rec = sSessionsHash.get(session.getNick());
}
if (rec!=null) {
rec.getThreadGroup().interrupt() ;
while (rec.getThreadGroup().activeCount()>0)
try {
Thread.sleep(1000);
} catch ( InterruptedException e) {}
rec.getThreadGroup().destroy();
sSessionsHash.del(session.getNick());
}
}
/* Parse and execute a SESSION message */
@Override
protected boolean execSessionMessage(String opcode, Properties props) {
String dest = "BUG!";
String nick = null ;
boolean ok = false ;
try{
if (opcode.equals("CREATE")) {
if ((this.getRawSession()!= null) || (this.getDatagramSession() != null)
|| (streamSession != null)) {
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
}
if (props == null) {
_log.debug("No parameters specified in SESSION CREATE message");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
}
dest = props.getProperty("DESTINATION");
if (dest == null) {
_log.debug("SESSION DESTINATION parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
}
props.remove("DESTINATION");
if (dest.equals("TRANSIENT")) {
_log.debug("TRANSIENT destination requested");
ByteArrayOutputStream priv = new ByteArrayOutputStream(640);
SAMUtils.genRandomKey(priv, null);
dest = Base64.encode(priv.toByteArray());
} else {
_log.debug("Custom destination specified [" + dest + "]");
}
try {
SAMUtils.checkPrivateDestination(dest);
} catch ( SAMUtils.InvalidDestination e ) {
return writeString("SESSION STATUS RESULT=INVALID_KEY\n");
}
nick = props.getProperty("ID");
if (nick == null) {
_log.debug("SESSION ID parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n");
}
props.remove("ID");
String style = props.getProperty("STYLE");
if (style == null) {
_log.debug("SESSION STYLE parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
}
props.remove("STYLE");
// Record the session in the database sSessionsHash
Properties allProps = new Properties();
allProps.putAll(i2cpProps);
allProps.putAll(props);
try {
sSessionsHash.put( nick, new SessionRecord(dest, allProps, this) ) ;
} catch (SessionsDB.ExistingId e) {
_log.debug("SESSION ID parameter already in use");
return writeString("SESSION STATUS RESULT=DUPLICATED_ID\n");
} catch (SessionsDB.ExistingDest e) {
return writeString("SESSION STATUS RESULT=DUPLICATED_DEST\n");
}
// Create the session
if (style.equals("RAW")) {
DatagramServer.getInstance(i2cpProps);
rawSession = newSAMRawSession(nick);
this.session = rawSession ;
} else if (style.equals("DATAGRAM")) {
DatagramServer.getInstance(i2cpProps);
datagramSession = newSAMDatagramSession(nick);
this.session = datagramSession ;
} else if (style.equals("STREAM")) {
streamSession = newSAMStreamSession(nick);
this.session = streamSession ;
} else {
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
}
ok = true ;
return writeString("SESSION STATUS RESULT=OK DESTINATION="
+ dest + "\n");
} else {
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
}
} catch (DataFormatException e) {
_log.debug("Invalid destination specified");
return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
} catch (I2PSessionException e) {
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
} catch (SAMException e) {
_log.error("Unexpected SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
} finally {
// unregister the session if it has not been created
if ( !ok && nick!=null ) {
sSessionsHash.del(nick) ;
session = null ;
}
}
}
SAMv3StreamSession newSAMStreamSession(String login )
throws IOException, DataFormatException, SAMException
{
return new SAMv3StreamSession( login ) ;
}
SAMv3RawSession newSAMRawSession(String login )
throws IOException, DataFormatException, SAMException, I2PSessionException
{
return new SAMv3RawSession( login ) ;
}
SAMv3DatagramSession newSAMDatagramSession(String login )
throws IOException, DataFormatException, SAMException, I2PSessionException
{
return new SAMv3DatagramSession( login ) ;
}
/* Parse and execute a STREAM message */
protected boolean execStreamMessage ( String opcode, Properties props )
{
String nick = null ;
SessionRecord rec = null ;
if ( session != null )
{
_log.error ( "STREAM message received, but this session is a master session" );
try {
notifyStreamResult(true, "I2P_ERROR", "master session cannot be used for streams");
} catch (IOException e) {}
return false;
}
nick = props.getProperty("ID");
if (nick == null) {
_log.debug("SESSION ID parameter not specified");
try {
notifyStreamResult(true, "I2P_ERROR", "ID not specified");
} catch (IOException e) {}
return false ;
}
props.remove("ID");
rec = sSessionsHash.get(nick);
if ( rec==null ) {
_log.debug("STREAM SESSION ID does not exist");
try {
notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID does not exist");
} catch (IOException e) {}
return false ;
}
streamSession = rec.getHandler().streamSession ;
if (streamSession==null) {
_log.debug("specified ID is not a stream session");
try {
notifyStreamResult(true, "I2P_ERROR", "specified ID is not a STREAM session");
} catch (IOException e) {}
return false ;
}
if ( opcode.equals ( "CONNECT" ) )
{
return execStreamConnect ( props );
}
else if ( opcode.equals ( "ACCEPT" ) )
{
return execStreamAccept ( props );
}
else if ( opcode.equals ( "FORWARD") )
{
return execStreamForwardIncoming( props );
}
else
{
_log.debug ( "Unrecognized RAW message opcode: \""
+ opcode + "\"" );
try {
notifyStreamResult(true, "I2P_ERROR", "Unrecognized RAW message opcode: "+opcode );
} catch (IOException e) {}
return false;
}
}
protected boolean execStreamConnect( Properties props) {
try {
if (props == null) {
notifyStreamResult(true,"I2P_ERROR","No parameters specified in STREAM CONNECT message");
_log.debug("No parameters specified in STREAM CONNECT message");
return false;
}
boolean verbose = props.getProperty("SILENT","false").equals("false");
String dest = props.getProperty("DESTINATION");
if (dest == null) {
notifyStreamResult(verbose, "I2P_ERROR", "Destination not specified in RAW SEND message");
_log.debug("Destination not specified in RAW SEND message");
return false;
}
props.remove("DESTINATION");
try {
streamSession.connect( this, dest, props );
return true ;
} catch (DataFormatException e) {
_log.debug("Invalid destination in STREAM CONNECT message");
notifyStreamResult ( verbose, "INVALID_KEY", null );
} catch (ConnectException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamResult ( verbose, "CONNECTION_REFUSED", null );
} catch (NoRouteToHostException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamResult ( verbose, "CANT_REACH_PEER", null );
} catch (InterruptedIOException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamResult ( verbose, "TIMEOUT", null );
} catch (I2PException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamResult ( verbose, "I2P_ERROR", e.getMessage() );
}
} catch (IOException e) {
}
return false ;
}
protected boolean execStreamForwardIncoming( Properties props ) {
try {
try {
streamSession.startForwardingIncoming(props);
notifyStreamResult( true, "OK", null );
return false ;
} catch (SAMException e) {
_log.debug("Forwarding STREAM connections failed: " + e.getMessage());
notifyStreamResult ( true, "I2P_ERROR", "Forwarding failed : " + e.getMessage() );
}
} catch (IOException e) {
}
return false ;
}
protected boolean execStreamAccept( Properties props )
{
boolean verbose = props.getProperty( "SILENT", "false").equals("false");
try {
try {
notifyStreamResult(verbose, "OK", null);
streamSession.accept(this, verbose);
return true ;
} catch (InterruptedIOException e) {
_log.debug("STREAM ACCEPT failed: " + e.getMessage());
notifyStreamResult( verbose, "TIMEOUT", e.getMessage() );
} catch (I2PException e) {
_log.debug("STREAM ACCEPT failed: " + e.getMessage());
notifyStreamResult ( verbose, "I2P_ERROR", e.getMessage() );
} catch (SAMException e) {
_log.debug("STREAM ACCEPT failed: " + e.getMessage());
notifyStreamResult ( verbose, "ALREADY_ACCEPTING", null );
}
} catch (IOException e) {
}
return false ;
}
public void notifyStreamResult(boolean verbose, String result, String message) throws IOException
{
if (!verbose) return ;
String out = "STREAM STATUS RESULT="+result;
if (message!=null)
out = out + " MESSAGE=\"" + message + "\"";
out = out + '\n';
if ( !writeString ( out ) )
{
throw new IOException ( "Error notifying connection to SAM client" );
}
}
public void notifyStreamIncomingConnection(Destination d) throws IOException {
if (streamSession == null) {
_log.error("BUG! Received stream connection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
if (!writeString(d.toBase64() + "\n")) {
throw new IOException("Error notifying connection to SAM client");
}
}
public static void notifyStreamIncomingConnection(SocketChannel client, Destination d) throws IOException {
if (!writeString(d.toBase64() + "\n", client)) {
throw new IOException("Error notifying connection to SAM client");
}
}
}

View File

@ -0,0 +1,88 @@
/**
*
*/
package net.i2p.sam;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Properties;
import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException;
import net.i2p.util.Log;
/**
* @author MKVore
*
*/
public class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SAMRawReceiver {
String nick = null ;
SAMv3Handler handler = null ;
SAMv3Handler.DatagramServer server ;
private final static Log _log = new Log ( SAMv3DatagramSession.class );
SocketAddress clientAddress = null ;
public String getNick() { return nick; }
/**
* @param nick nickname of the session
* @param server DatagramServer used for communication with the client
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
*/
public SAMv3RawSession(String nick)
throws IOException, DataFormatException, I2PSessionException {
super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
SAMv3Handler.sSessionsHash.get(nick).getProps(),
SAMv3Handler.sSessionsHash.get(nick).getHandler()
);
this.nick = nick ;
this.recv = this ;
this.server = SAMv3Handler.DatagramServer.getInstance() ;
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null ) throw new InterruptedIOException() ;
this.handler = rec.getHandler();
Properties props = rec.getProps();
String portStr = props.getProperty("PORT") ;
if ( portStr==null ) {
_log.debug("receiver port not specified. Current socket will be used.");
}
else {
int port = Integer.parseInt(portStr);
String host = props.getProperty("HOST");
if ( host==null ) {
_log.debug("no host specified. Take from the client socket");
host = rec.getHandler().getClientIP();
}
this.clientAddress = new InetSocketAddress(host,port);
}
}
public void receiveRawBytes(byte[] data) throws IOException {
if (this.clientAddress==null) {
this.handler.receiveRawBytes(data);
} else {
ByteBuffer msgBuf = ByteBuffer.allocate(data.length);
msgBuf.put(data);
msgBuf.flip();
this.server.send(this.clientAddress, msgBuf);
}
}
public void stopRawReceiving() {}
}

View File

@ -0,0 +1,386 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.Properties;
import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer ;
import java.nio.channels.SocketChannel;
/**
* SAMv3 STREAM session class.
*
* @author mkvore
*/
public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Session
{
private final static Log _log = new Log ( SAMv3StreamSession.class );
protected final int BUFFER_SIZE = 1024 ;
protected Object socketServerLock = new Object();
protected I2PServerSocket socketServer = null;
protected String nick ;
public String getNick() {
return nick ;
}
/**
* Create a new SAM STREAM session.
*
* @param dest Base64-encoded destination (private key)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
* @throws IOException
* @throws DataFormatException
* @throws SAMException
*/
public SAMv3StreamSession(String login)
throws IOException, DataFormatException, SAMException
{
initSAMStreamSession(login);
}
public static SAMv3Handler.SessionsDB getDB()
{
return SAMv3Handler.sSessionsHash ;
}
private void initSAMStreamSession(String login)
throws IOException, DataFormatException, SAMException {
SAMv3Handler.SessionRecord rec = getDB().get(login);
String dest = rec.getDest() ;
ByteArrayInputStream ba_dest = new ByteArrayInputStream(Base64.decode(dest));
this.recv = rec.getHandler();
_log.debug("SAM STREAM session instantiated");
Properties allprops = new Properties();
allprops.putAll(System.getProperties());
allprops.putAll(rec.getProps());
String i2cpHost = allprops.getProperty(I2PClient.PROP_TCP_HOST, "127.0.0.1");
int i2cpPort ;
String port = allprops.getProperty(I2PClient.PROP_TCP_PORT, "7654");
try {
i2cpPort = Integer.parseInt(port);
} catch (NumberFormatException nfe) {
throw new SAMException("Invalid I2CP port specified [" + port + "]");
}
_log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(ba_dest,
i2cpHost,
i2cpPort,
allprops);
if (socketMgr == null) {
throw new SAMException("Error creating I2PSocketManager towards "+i2cpHost+":"+i2cpPort);
}
socketMgr.addDisconnectListener(new DisconnectListener());
this.nick = login ;
}
/**
* Connect the SAM STREAM session to the specified Destination
*
* @param id Unique id for the connection
* @param dest Base64-encoded Destination to connect to
* @param props Options to be used for connection
*
* @return true if successful
* @throws DataFormatException if the destination is not valid
* @throws ConnectException if the destination refuses connections
* @throws NoRouteToHostException if the destination can't be reached
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there's another I2P-related error
* @throws IOException
*/
public void connect ( SAMv3Handler handler, String dest, Properties props ) throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, IOException {
boolean verbose = (props.getProperty("SILENT", "false").equals("false"));
Destination d = SAMUtils.getDest(dest);
I2PSocketOptions opts = socketMgr.buildOptions(props);
if (props.getProperty(I2PSocketOptions.PROP_CONNECT_TIMEOUT) == null)
opts.setConnectTimeout(60 * 1000);
_log.debug("Connecting new I2PSocket...");
// blocking connection (SAMv3)
I2PSocket i2ps = socketMgr.connect(d, opts);
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null ) throw new InterruptedIOException() ;
handler.notifyStreamResult(verbose, "OK", null) ;
handler.stealSocket() ;
ReadableByteChannel fromClient = handler.getClientSocket();
ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream());
WritableByteChannel toClient = handler.getClientSocket();
WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream());
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P), "SAMPipeClientToI2P"))).start();
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient), "SAMPipeClientToI2P"))).start();
}
/**
* Accept an incoming STREAM
*
* @param id Unique id for the connection
* @param dest Base64-encoded Destination to connect to
* @param props Options to be used for connection
*
* @return true if successful
* @throws DataFormatException if the destination is not valid
* @throws ConnectException if the destination refuses connections
* @throws NoRouteToHostException if the destination can't be reached
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there's another I2P-related error
* @throws IOException
*/
public void accept(SAMv3Handler handler, boolean verbose)
throws I2PException, InterruptedIOException, IOException, SAMException {
synchronized( this.socketServerLock )
{
if (this.socketServer!=null) {
_log.debug("a socket server is already defined for this destination");
throw new SAMException("a socket server is already defined for this destination");
}
this.socketServer = this.socketMgr.getServerSocket();
}
I2PSocket i2ps;
i2ps = this.socketServer.accept();
synchronized( this.socketServerLock )
{
this.socketServer = null ;
}
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null ) throw new InterruptedIOException() ;
if (verbose)
handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
handler.stealSocket() ;
ReadableByteChannel fromClient = handler.getClientSocket();
ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream());
WritableByteChannel toClient = handler.getClientSocket();
WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream());
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P), "SAMPipeClientToI2P"))).start();
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient), "SAMPipeClientToI2P"))).start();
}
public void startForwardingIncoming( Properties props ) throws SAMException, InterruptedIOException
{
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
boolean verbose = props.getProperty("SILENT", "false").equals("false");
if ( rec==null ) throw new InterruptedIOException() ;
String portStr = props.getProperty("PORT") ;
if ( portStr==null ) {
_log.debug("receiver port not specified");
throw new SAMException("receiver port not specified");
}
int port = Integer.parseInt(portStr);
String host = props.getProperty("HOST");
if ( host==null ) {
_log.debug("no host specified. Take from the client socket");
host = rec.getHandler().getClientIP();
}
synchronized( this.socketServerLock )
{
if (this.socketServer!=null) {
_log.debug("a socket server is already defined for this destination");
throw new SAMException("a socket server is already defined for this destination");
}
this.socketServer = this.socketMgr.getServerSocket();
}
SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose);
(new Thread(rec.getThreadGroup(), new I2PAppThread(forwarder, "SAMStreamForwarder"))).start();
}
public class SocketForwarder extends Thread
{
String host = null ;
int port = 0 ;
SAMv3StreamSession session;
boolean verbose;
SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose) {
this.host = host ;
this.port = port ;
this.session = session ;
this.verbose = verbose ;
}
public void run()
{
while (session.socketServer!=null) {
I2PSocket i2ps = null ;
try {
session.socketServer.waitIncoming(0);
} catch (ConnectException e) {
_log.debug("ConnectException");
break ;
} catch (I2PException e) {
_log.debug("I2PServerSocket has been closed");
break ;
} catch (InterruptedException e) {
_log.debug("InterruptedException");
break ;
}
java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host,port);
SocketChannel clientServerSock = null ;
try {
clientServerSock = SocketChannel.open(addr) ;
}
catch ( IOException e ) {
continue ;
}
try {
i2ps = session.socketServer.accept(1);
} catch (Exception e) {}
if (i2ps==null) {
try {
clientServerSock.close();
} catch (IOException ee) {}
continue ;
}
try {
if (this.verbose)
SAMv3Handler.notifyStreamIncomingConnection(
clientServerSock, i2ps.getPeerDestination());
ReadableByteChannel fromClient = clientServerSock ;
ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream());
WritableByteChannel toClient = clientServerSock ;
WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream());
new I2PAppThread(new Pipe(fromClient,toI2P), "SAMPipeClientToI2P").start();
new I2PAppThread(new Pipe(fromI2P,toClient), "SAMPipeClientToI2P").start();
} catch (IOException e) {
try {
clientServerSock.close();
} catch (IOException ee) {}
try {
i2ps.close();
} catch (IOException ee) {}
continue ;
}
}
}
}
public class Pipe extends Thread
{
ReadableByteChannel in ;
WritableByteChannel out ;
ByteBuffer buf ;
public Pipe(ReadableByteChannel in, WritableByteChannel out)
{
this.in = in ;
this.out = out ;
this.buf = ByteBuffer.allocate(BUFFER_SIZE) ;
}
public void run()
{
try {
while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) {
buf.flip();
out.write(buf);
buf.compact();
}
}
catch (IOException e)
{
this.interrupt();
}
try {
in.close();
}
catch (IOException e) {}
try {
buf.flip();
while (buf.hasRemaining())
out.write(buf);
}
catch (IOException e) {}
try {
out.close();
}
catch (IOException e) {}
}
}
/**
* Close the stream session
*/
@Override
public void close() {
socketMgr.destroySocketManager();
}
public boolean sendBytes(String s, byte[] b) throws DataFormatException
{
throw new DataFormatException(null);
}
}

View File

@ -12,17 +12,17 @@ import net.i2p.util.Log;
*
*/
public class SAMEventHandler extends SAMClientEventListenerImpl {
private I2PAppContext _context;
//private I2PAppContext _context;
private Log _log;
private Boolean _helloOk;
private Object _helloLock = new Object();
private Boolean _sessionCreateOk;
private Object _sessionCreateLock = new Object();
private Object _namingReplyLock = new Object();
private Map _namingReplies = new HashMap();
private Map<String,String> _namingReplies = new HashMap<String,String>();
public SAMEventHandler(I2PAppContext ctx) {
_context = ctx;
//_context = ctx;
_log = ctx.logManager().getLog(getClass());
}

View File

@ -31,10 +31,10 @@ public class SAMStreamSend {
private OutputStream _samOut;
private InputStream _samIn;
private SAMReader _reader;
private boolean _dead;
//private boolean _dead;
private SAMEventHandler _eventHandler;
/** Connection id (Integer) to peer (Flooder) */
private Map _remotePeers;
private Map<Integer, Sender> _remotePeers;
public static void main(String args[]) {
if (args.length < 4) {
@ -42,7 +42,7 @@ public class SAMStreamSend {
return;
}
I2PAppContext ctx = new I2PAppContext();
String files[] = new String[args.length - 3];
//String files[] = new String[args.length - 3];
SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]);
sender.startup();
}
@ -50,14 +50,14 @@ public class SAMStreamSend {
public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) {
_context = ctx;
_log = ctx.logManager().getLog(SAMStreamSend.class);
_dead = false;
//_dead = false;
_samHost = samHost;
_samPort = samPort;
_destFile = destFile;
_dataFile = dataFile;
_conOptions = "";
_eventHandler = new SendEventHandler(_context);
_remotePeers = new HashMap();
_remotePeers = new HashMap<Integer,Sender>();
}
public void startup() {
@ -207,7 +207,6 @@ public class SAMStreamSend {
_started = _context.clock().now();
_context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
byte data[] = new byte[1024];
long value = 0;
long lastSend = _context.clock().now();
while (!_closed) {
try {

View File

@ -31,10 +31,10 @@ public class SAMStreamSink {
private OutputStream _samOut;
private InputStream _samIn;
private SAMReader _reader;
private boolean _dead;
//private boolean _dead;
private SAMEventHandler _eventHandler;
/** Connection id (Integer) to peer (Flooder) */
private Map _remotePeers;
private Map<Integer, Sink> _remotePeers;
public static void main(String args[]) {
if (args.length < 4) {
@ -49,14 +49,14 @@ public class SAMStreamSink {
public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) {
_context = ctx;
_log = ctx.logManager().getLog(SAMStreamSink.class);
_dead = false;
//_dead = false;
_samHost = samHost;
_samPort = samPort;
_destFile = destFile;
_sinkDir = sinkDir;
_conOptions = "";
_eventHandler = new SinkEventHandler(_context);
_remotePeers = new HashMap();
_remotePeers = new HashMap<Integer,Sink>();
}
public void startup() {
@ -70,7 +70,8 @@ public class SAMStreamSink {
String ourDest = handshake();
_log.debug("Handshake complete. we are " + ourDest);
if (ourDest != null) {
boolean written = writeDest(ourDest);
//boolean written =
writeDest(ourDest);
_log.debug("Dest written");
}
}

View File

@ -2,8 +2,6 @@ package net.i2p.client.streaming;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
@ -23,6 +21,7 @@ class ConnectionHandler {
private Log _log;
private ConnectionManager _manager;
private LinkedBlockingQueue<Packet> _synQueue;
private Object _synSignal;
private boolean _active;
private int _acceptTimeout;
@ -41,7 +40,8 @@ class ConnectionHandler {
_context = context;
_log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr;
_synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
_synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
_synSignal= new Object();
_active = false;
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
}
@ -81,6 +81,10 @@ class ConnectionHandler {
boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) {
SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
// advertise the new syn packet to threads that could be waiting
// (by calling waitSyn(long)
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
synchronized (this._synSignal) {this._synSignal.notifyAll();}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full");
@ -89,6 +93,33 @@ class ConnectionHandler {
}
}
/**
* Wait until some SYN packet is available
* @param ms max amount of time to wait for a connection (if negative or null,
* wait indefinitely)
* @throws InterruptedException
*/
public void waitSyn( long ms ) throws InterruptedException {
synchronized (this._synSignal)
{
long now = this._context.clock().now() ;
long expiration = now + ms ;
while ( expiration > now || ms<=0 ) {
// check if there is a SYN packet in the queue
for ( Packet p : this._synQueue ) {
if ( p.isFlagSet(Packet.FLAG_SYNCHRONIZE) ) return ;
}
// wait until a SYN is signaled
if ( ms == 0) {
this._synSignal.wait();
} else {
this._synSignal.wait(expiration-now);
now = this._context.clock().now();
}
}
}
}
/**
* Receive an incoming connection (built from a received SYN)
* Non-SYN packets with a zero SendStreamID may also be queued here so

View File

@ -1,6 +1,8 @@
package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
/**
@ -45,4 +47,43 @@ public class I2PServerSocketFull implements I2PServerSocket {
public I2PSocketManager getManager() {
return _socketManager;
}
/**
* accept(timeout) waits timeout ms for a socket connecting. If a socket is
* not available during the timeout, return null. accept(0) behaves like accept()
*
* @param timeout in ms
*
* @return a connected I2PSocket, or null
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
*/
public I2PSocket accept(long timeout) throws I2PException {
long reset_timeout = this.getSoTimeout();
try {
this.setSoTimeout(timeout);
return this.accept();
} catch (SocketTimeoutException e) {
return null ;
} finally {
this.setSoTimeout(reset_timeout);
}
}
/**
* block until a SYN packet is detected or the timeout is reached. If timeout is 0,
* block until a SYN packet is detected.
*
* @param timeoutMs
* @throws InterruptedException
* @throws I2PException
*/
public void waitIncoming(long timeoutMs) throws I2PException, InterruptedException {
if (this._socketManager.getConnectionManager().getSession().isClosed())
throw new I2PException("Session is closed");
this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs);
}
}