Files
go-i2cp/client.go
2019-07-13 05:43:10 -04:00

660 lines
20 KiB
Go

package go_i2cp
import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/binary"
"fmt"
"io"
"os"
"strings"
"sync"
)
const I2CP_CLIENT_VERSION = "0.9.33"
const TAG = CLIENT
const I2CP_PROTOCOL_INIT uint8 = 0x2a
const I2CP_MESSAGE_SIZE = 0xffff
const I2CP_MAX_SESSIONS = 0xffff
const I2CP_MAX_SESSIONS_PER_CLIENT = 32
const I2CP_MSG_ANY uint8 = 0
const I2CP_MSG_BANDWIDTH_LIMITS uint8 = 23
const I2CP_MSG_CREATE_LEASE_SET uint8 = 4
const I2CP_MSG_CREATE_SESSION uint8 = 1
const I2CP_MSG_DEST_LOOKUP uint8 = 34
const I2CP_MSG_DEST_REPLY uint8 = 35
const I2CP_MSG_DESTROY_SESSION uint8 = 3
const I2CP_MSG_DISCONNECT uint8 = 30
const I2CP_MSG_GET_BANDWIDTH_LIMITS uint8 = 8
const I2CP_MSG_GET_DATE uint8 = 32
const I2CP_MSG_HOST_LOOKUP uint8 = 38
const I2CP_MSG_HOST_REPLY uint8 = 39
const I2CP_MSG_MESSAGE_STATUS uint8 = 22
const I2CP_MSG_PAYLOAD_MESSAGE uint8 = 31
const I2CP_MSG_REQUEST_LEASESET uint8 = 21
const I2CP_MSG_REQUEST_VARIABLE_LEASESET uint8 = 37
const I2CP_MSG_SEND_MESSAGE uint8 = 5
const I2CP_MSG_SESSION_STATUS uint8 = 20
const I2CP_MSG_SET_DATE uint8 = 33
/* Router capabilities */
const ROUTER_CAN_HOST_LOOKUP uint32 = 1
type ClientProperty int
const (
CLIENT_PROP_ROUTER_ADDRESS ClientProperty = iota
CLIENT_PROP_ROUTER_PORT
CLIENT_PROP_ROUTER_USE_TLS
CLIENT_PROP_USERNAME
CLIENT_PROP_PASSWORD
NR_OF_I2CP_CLIENT_PROPERTIES
)
const (
PROTOCOL_STREAMING = 6
PROTOCOL_DATAGRAM = 17
PROTOCOL_RAW_DATAGRAM = 18
)
const (
HOST_LOOKUP_TYPE_HASH = iota
HOST_LOOKUP_TYPE_HOST = iota
)
var defaultProperties = map[string]string{
"i2cp.password": "",
"i2cp.username": "",
"i2cp.closeIdleTime": "",
"i2cp.closeOnIdle": "",
"i2cp.encryptLeaseSet": "",
"i2cp.fastReceive": "",
"i2cp.gzip": "",
"i2cp.leaseSetKey": "",
"i2cp.leaseSetPrivateKey": "",
"i2cp.leaseSetSigningPrivateKey": "",
"i2cp.messageReliability": "",
"i2cp.reduceIdleTime": "",
"i2cp.reduceOnIdle": "",
"i2cp.reduceQuantity": "",
"i2cp.SSL": "",
"i2cp.tcp.host": "127.0.0.1",
"i2cp.tcp.port": "7654",
}
type ClientCallBacks struct {
opaque *interface{}
onDisconnect func(*Client, string, *interface{})
onLog func(*Client, LoggerTags, string)
}
type LookupEntry struct {
address string
session *Session
}
type RouterInfo struct {
date uint64
version Version
capabilities uint32
}
type Client struct {
logger *LoggerCallbacks // TODO idk wat this is for
callbacks *ClientCallBacks
properties map[string]string
tcp Tcp
outputStream *Stream
messageStream *Stream
router RouterInfo
outputQueue []*Stream
sessions map[uint16]*Session
n_sessions int
lookup map[string]uint32
lookupReq map[uint32]LookupEntry
lock sync.Mutex
connected bool
currentSession *Session // *opaque in the C lib
lookupRequestId uint32
}
var defaultConfigFile = "/.i2cp.conf"
// NewClient creates a new i2p client with the specified callbacks
func NewClient(callbacks *ClientCallBacks) (c *Client) {
c = new(Client)
c.callbacks = callbacks
LogInit(nil, ERROR)
c.outputStream = NewStream(make([]byte, 0, I2CP_MESSAGE_SIZE))
c.messageStream = NewStream(make([]byte, 0, I2CP_MESSAGE_SIZE))
c.setDefaultProperties()
c.lookup = make(map[string]uint32, 1000)
c.lookupReq = make(map[uint32]LookupEntry, 1000)
c.sessions = make(map[uint16]*Session)
c.outputQueue = make([]*Stream, 0)
c.tcp.Init()
return
}
func (c *Client) setDefaultProperties() {
c.properties = defaultProperties
home := os.Getenv("I2CP_HOME")
if len(home) == 0 {
home = ""
}
conf := os.Getenv("GO_I2CP_CONF")
if len(conf) == 0 {
conf = defaultConfigFile
}
config := home + conf
Debug(CLIENT, "Loading config file %s", config)
ParseConfig(config, c.SetProperty)
}
func (c *Client) sendMessage(typ uint8, stream *Stream, queue bool) (err error) {
send := NewStream(make([]byte, 0, stream.Len()+4+1))
err = send.WriteUint32(uint32(stream.Len()))
err = send.WriteByte(typ)
lenb := stream.Len()
_ = lenb
_, err = send.Write(stream.Bytes())
lenc := send.Len()
_ = lenc
if queue {
Debug(PROTOCOL, "Putting %d bytes message on the output queue.", send.Len())
c.lock.Lock()
c.outputQueue = append(c.outputQueue, send)
c.lock.Unlock()
} else {
_, err = c.tcp.Send(send)
}
return
}
func (c *Client) recvMessage(typ uint8, stream *Stream, dispatch bool) (err error) {
length := uint32(0)
msgType := uint8(0)
var i int
firstFive := NewStream(make([]byte, 5))
i, err = c.tcp.Receive(firstFive)
if i == 0 {
c.callbacks.onDisconnect(c, "Didn't receive anything", nil)
}
length, err = firstFive.ReadUint32()
msgType, err = firstFive.ReadByte()
if (typ == I2CP_MSG_SET_DATE) && (length > 0xffff) {
Fatal(PROTOCOL, "Unexpected response, check that your router SSL settings match the ~/.i2cp.conf configuration")
}
if length > 0xffff && typ != I2CP_MSG_DISCONNECT {
Fatal(PROTOCOL, "unexpected message length, length > 0xffff")
}
if (typ != 0) && (msgType != typ) {
Error(PROTOCOL, "expected message type %d, received %d", typ, msgType)
}
// receive rest
i, err = c.tcp.Receive(stream)
if dispatch {
c.onMessage(msgType, stream)
}
return
}
func (c *Client) onMessage(msgType uint8, stream *Stream) {
switch msgType {
case I2CP_MSG_SET_DATE:
c.onMsgSetDate(stream)
case I2CP_MSG_DISCONNECT:
c.onMsgDisconnect(stream)
case I2CP_MSG_PAYLOAD_MESSAGE:
c.onMsgPayload(stream)
case I2CP_MSG_MESSAGE_STATUS:
c.onMsgStatus(stream)
case I2CP_MSG_DEST_REPLY:
c.onMsgDestReply(stream)
case I2CP_MSG_BANDWIDTH_LIMITS:
c.onMsgBandwithLimit(stream)
case I2CP_MSG_SESSION_STATUS:
c.onMsgSessionStatus(stream)
case I2CP_MSG_REQUEST_VARIABLE_LEASESET:
c.onMsgReqVariableLease(stream)
case I2CP_MSG_HOST_REPLY:
c.onMsgHostReply(stream)
default:
Info(TAG, "%s", "recieved unhandled i2cp message.")
}
}
func (c *Client) onMsgSetDate(stream *Stream) {
Debug(TAG|PROTOCOL, "Received SetDate message.")
var err error
c.router.date, err = stream.ReadUint64()
var verLength uint8
verLength, err = stream.ReadByte()
version := make([]byte, verLength)
_, err = stream.Read(version)
c.router.version = parseVersion(string(version))
Debug(TAG|PROTOCOL, "Router version %s, date %d", string(version), c.router.date)
if err != nil {
Error(TAG|PROTOCOL, "Could not read SetDate correctly data")
}
if c.router.version.compare(Version{major: 0, minor: 9, micro: 10, qualifier: 0}) >= 0 {
c.router.capabilities |= ROUTER_CAN_HOST_LOOKUP
}
}
func (c *Client) onMsgDisconnect(stream *Stream) {
var err error
Debug(TAG|PROTOCOL, "Received Disconnect message")
//size, err = stream.ReadByte()
strbuf := make([]byte, stream.Len())
lens := stream.Len()
_ = lens
_, err = stream.Read(strbuf)
Debug(TAG|PROTOCOL, "Received Disconnect message with reason %s", string(strbuf))
if err != nil {
Error(TAG|PROTOCOL, "Could not read msgDisconnect correctly data")
}
if c.callbacks != nil && c.callbacks.onDisconnect != nil {
c.callbacks.onDisconnect(c, string(strbuf), nil)
}
}
func (c *Client) onMsgPayload(stream *Stream) {
var gzipHeader = [3]byte{0x1f, 0x8b, 0x08}
var testHeader [3]byte
var protocol uint8
var sessionId, srcPort, destPort uint16
var messageId, payloadSize uint32
var err error
var ret int
Debug(TAG|PROTOCOL, "Received PayloadMessage message")
sessionId, err = stream.ReadUint16()
messageId, err = stream.ReadUint32()
_ = messageId // currently unused
session, ok := c.sessions[sessionId]
if !ok {
Fatal(TAG|FATAL, "Session id %d does not match any of our currently initiated sessions by %p", sessionId, c)
}
payloadSize, err = stream.ReadUint32()
_ = payloadSize // currently unused
// validate gzip header
var msgStream = bytes.NewBuffer(stream.Bytes())
_, err = stream.Read(testHeader[:])
if testHeader != gzipHeader {
Warning(TAG, "Payload validation failed, skipping payload")
return
}
var payload = bytes.NewBuffer(make([]byte, 0xffff))
var decompress io.ReadCloser
decompress, err = zlib.NewReader(msgStream)
io.Copy(payload, decompress)
decompress.Close()
if payload.Len() > 0 {
// finish reading header
// skip gzip flags
_, err = stream.ReadByte()
srcPort, err = stream.ReadUint16()
destPort, err = stream.ReadUint16()
_, err = stream.ReadByte()
protocol, err = stream.ReadByte()
session.dispatchMessage(protocol, srcPort, destPort, &Stream{payload})
}
_ = err // currently unused
_ = ret // currently unused
}
func (c *Client) onMsgStatus(stream *Stream) {
var status uint8
var sessionId uint16
var messageId, size, nonce uint32
var err error
Debug(TAG|PROTOCOL, "Received MessageStatus message")
sessionId, err = stream.ReadUint16()
messageId, err = stream.ReadUint32()
status, err = stream.ReadByte()
size, err = stream.ReadUint32()
nonce, err = stream.ReadUint32()
_ = err // currently unused
Debug(TAG|PROTOCOL, "Message status; session id %d, message id %d, status %d, size %d, nonce %d", sessionId, messageId, status, size, nonce)
}
func (c *Client) onMsgDestReply(stream *Stream) {
var b32 string
var destination *Destination
var lup LookupEntry
var err error
var requestId uint32
Debug(TAG|PROTOCOL, "Received DestReply message.")
if stream.Len() != 32 {
destination, err = NewDestinationFromMessage(stream)
if err != nil {
Fatal(TAG|FATAL, "Failed to construct destination from stream")
}
b32 = destination.b32
} else {
bits := GetCryptoInstance().EncodeStream(CODEC_BASE32, stream)
b32 = string(bits.Bytes()) + ".b32.i2p"
Debug(TAG, "Could not resolve destination")
}
requestId = c.lookup[b32]
delete(c.lookup, b32)
lup = c.lookupReq[requestId]
delete(c.lookupReq, requestId)
if lup == (LookupEntry{}) {
Warning(TAG, "No sesssion for destination lookup of address '%s'", b32)
} else {
lup.session.dispatchDestination(requestId, b32, destination)
}
}
func (c *Client) onMsgBandwithLimit(stream *Stream) {
Debug(TAG|PROTOCOL, "Received BandwidthLimits message.")
}
func (c *Client) onMsgSessionStatus(stream *Stream) {
var sess *Session
var sessionID uint16
var sessionStatus uint8
var err error
Debug(TAG|PROTOCOL, "Received SessionStatus message.")
sessionID, err = stream.ReadUint16()
sessionStatus, err = stream.ReadByte()
_ = err // currently unused
if SessionStatus(sessionStatus) == I2CP_SESSION_STATUS_CREATED {
if c.currentSession == nil {
Error(TAG, "Received session status created without waiting for it %p", c)
return
}
c.currentSession.id = sessionID
c.sessions[sessionID] = c.currentSession
c.currentSession = nil
}
sess = c.sessions[sessionID]
if sess == nil {
Fatal(TAG|FATAL, "Session with id %d doesn't exists in client instance %p.", sessionID, c)
} else {
sess.dispatchStatus(SessionStatus(sessionStatus))
}
}
func (c *Client) onMsgReqVariableLease(stream *Stream) {
var sessionId uint16
var tunnels uint8
var sess *Session
var leases []*Lease
var err error
Debug(TAG|PROTOCOL, "Received RequestVariableLeaseSet message.")
sessionId, err = stream.ReadUint16()
tunnels, err = stream.ReadByte()
sess = c.sessions[sessionId]
if sess == nil {
Fatal(TAG|FATAL, "Session with id %d doesn't exist in client instance %p.", sessionId, c)
}
leases = make([]*Lease, tunnels)
for i := uint8(0); i < tunnels; i++ {
leases[i], err = NewLeaseFromStream(stream)
}
c.msgCreateLeaseSet(sess, tunnels, leases, true)
_ = err // currently unused
}
func (c *Client) onMsgHostReply(stream *Stream) {
var result uint8
var sessionId uint16
var requestId uint32
var sess *Session
var dest *Destination
var lup LookupEntry
var err error
Debug(TAG|PROTOCOL, "Received HostReply message.")
sessionId, err = stream.ReadUint16()
requestId, err = stream.ReadUint32()
result, err = stream.ReadByte()
if result == 0 {
dest, err = NewDestinationFromMessage(stream)
if err != nil {
Fatal(TAG|FATAL, "Failed to construct destination from stream.")
}
}
sess = c.sessions[sessionId]
if sess == nil {
Fatal(TAG|FATAL, "Session with id %d doesn't exist in client instance %p.", sessionId, c)
}
lup = c.lookupReq[requestId]
delete(c.lookupReq, requestId)
sess.dispatchDestination(requestId, lup.address, dest)
_ = err // currently unused
}
func (c *Client) msgCreateLeaseSet(session *Session, tunnels uint8, leases []*Lease, queue bool) {
var err error
var nullbytes [256]byte
var leaseSet *Stream
var config *SessionConfig
var dest *Destination
var sgk *SignatureKeyPair
Debug(TAG|PROTOCOL, "Sending CreateLeaseSetMessage")
leaseSet = NewStream(make([]byte, 4096))
config = session.config
dest = config.destination
sgk = &dest.sgk
// memset 0 nullbytes
for i := 0; i < len(nullbytes); i++ {
nullbytes[i] = 0
}
// construct the message
c.messageStream.WriteUint16(session.id)
c.messageStream.Write(nullbytes[:20])
c.messageStream.Write(nullbytes[:256])
//Build leaseset stream and sign it
dest.WriteToMessage(leaseSet)
leaseSet.Write(nullbytes[:256])
GetCryptoInstance().WritePublicSignatureToStream(sgk, leaseSet)
leaseSet.WriteByte(tunnels)
for i := uint8(0); i < tunnels; i++ {
leases[i].WriteToMessage(leaseSet)
}
GetCryptoInstance().SignStream(sgk, leaseSet)
c.messageStream.Write(leaseSet.Bytes())
if err = c.sendMessage(I2CP_MSG_CREATE_LEASE_SET, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending CreateLeaseSet")
}
}
func (c *Client) msgGetDate(queue bool) {
var err error
Debug(TAG|PROTOCOL, "Sending GetDateMessage")
c.messageStream.Reset()
c.messageStream.WriteLenPrefixedString(I2CP_CLIENT_VERSION)
if len(c.properties["i2cp.username"]) > 0 {
authInfo := map[string]string{
"i2cp.username": c.properties["i2cp.username"],
"i2cp.password": c.properties["i2cp.password"],
}
c.messageStream.WriteMapping(authInfo)
}
if err = c.sendMessage(I2CP_MSG_GET_DATE, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending GetDateMessage")
}
}
func (c *Client) msgCreateSession(config *SessionConfig, queue bool) error {
var err error
Debug(TAG|PROTOCOL, "Sending CreateSessionMessage")
c.messageStream.Reset()
config.writeToMessage(c.messageStream)
if err = c.sendMessage(I2CP_MSG_CREATE_SESSION, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending CreateSessionMessage.")
return err
}
return err
}
func (c *Client) msgDestLookup(hash []byte, queue bool) {
Debug(TAG|PROTOCOL, "Sending DestLookupMessage.")
c.messageStream.Reset()
c.messageStream.Write(hash)
if err := c.sendMessage(I2CP_MSG_DEST_LOOKUP, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending DestLookupMessage.")
}
}
func (c *Client) msgHostLookup(sess *Session, requestId, timeout uint32, typ uint8, data []byte, queue bool) {
var sessionId uint16
Debug(TAG|PROTOCOL, "Sending HostLookupMessage.")
c.messageStream.Reset()
sessionId = sess.id
c.messageStream.WriteUint16(sessionId)
c.messageStream.WriteUint32(requestId)
c.messageStream.WriteUint32(timeout)
c.messageStream.WriteByte(typ)
if typ == HOST_LOOKUP_TYPE_HASH {
c.messageStream.Write(data)
}
if err := c.sendMessage(I2CP_MSG_HOST_LOOKUP, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending HostLookupMessage")
}
}
func (c *Client) msgGetBandwidthLimits(queue bool) {
Debug(TAG|PROTOCOL, "Sending GetBandwidthLimitsMessage.")
c.messageStream.Reset()
if err := c.sendMessage(I2CP_MSG_GET_BANDWIDTH_LIMITS, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending GetBandwidthLimitsMessage")
}
}
func (c *Client) msgDestroySession(sess *Session, queue bool) {
Debug(TAG|PROTOCOL, "Sending DestroySessionMessage")
c.messageStream.Reset()
c.messageStream.WriteUint16(sess.id)
if err := c.sendMessage(I2CP_MSG_DESTROY_SESSION, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending DestroySessionMessage")
}
}
func (c *Client) msgSendMessage(sess *Session, dest *Destination, protocol uint8, srcPort, destPort uint16, payload *Stream, nonce uint32, queue bool) {
Debug(TAG|PROTOCOL, "Sending SendMessageMessage")
out := bytes.NewBuffer(make([]byte, 0xffff))
c.messageStream.Reset()
c.messageStream.WriteUint16(sess.id)
dest.WriteToMessage(c.messageStream)
compress := gzip.NewWriter(out)
compress.Write(payload.Bytes())
compress.Close()
header := out.Bytes()[:10]
binary.LittleEndian.PutUint16(header[4:6], srcPort)
binary.LittleEndian.PutUint16(header[6:8], destPort)
header[9] = protocol
c.messageStream.WriteUint32(uint32(out.Len()))
c.messageStream.Write(out.Bytes())
c.messageStream.WriteUint32(nonce)
if err := c.sendMessage(I2CP_MSG_SEND_MESSAGE, c.messageStream, queue); err != nil {
Error(TAG, "Error while sending SendMessageMessage")
}
}
func (c *Client) Connect() error {
Info(0, "Client connecting to i2cp at %s:%s", c.properties["i2cp.tcp.host"], c.properties["i2cp.tcp.host"])
err := c.tcp.Connect()
if err != nil {
//panic(err)
return err
}
c.outputStream.Reset()
c.outputStream.WriteByte(I2CP_PROTOCOL_INIT)
_, err = c.tcp.Send(c.outputStream)
if err != nil {
return err
}
Debug(PROTOCOL, "Sending protocol byte message")
c.msgGetDate(false)
err = c.recvMessage(I2CP_MSG_SET_DATE, c.messageStream, true)
if err != nil {
return err
}
return nil
}
func (c *Client) CreateSession(sess *Session) error {
if c.n_sessions == I2CP_MAX_SESSIONS_PER_CLIENT {
Warning(TAG, "Maximum number of session per client connection reached.")
return fmt.Errorf("%d %s", TAG, "Maximum number of session per client connection reached.")
}
sess.config.SetProperty(SESSION_CONFIG_PROP_I2CP_FAST_RECEIVE, "true")
sess.config.SetProperty(SESSION_CONFIG_PROP_I2CP_MESSAGE_RELIABILITY, "none")
err := c.msgCreateSession(sess.config, false)
if err != nil {
return err
}
c.currentSession = sess
c.recvMessage(I2CP_MSG_ANY, c.messageStream, true)
return nil
}
func (c *Client) ProcessIO() error {
c.lock.Lock()
for _, stream := range c.outputQueue {
Debug(TAG|PROTOCOL, "Sending %d bytes message", stream.Len())
ret, err := c.tcp.Send(stream)
if ret < 0 {
c.lock.Unlock()
return err
}
if ret == 0 {
break
}
}
c.lock.Unlock()
var err error
for c.tcp.CanRead() {
if err = c.recvMessage(I2CP_MSG_ANY, c.messageStream, true); err != nil {
return err
}
}
return err
}
func (c *Client) DestinationLookup(session *Session, address string) (requestId uint32) {
var out *Stream
var lup LookupEntry
b32Len := 52 + 8
defaultTimeout := uint32(30000)
routerCanHostLookup := (c.router.capabilities & ROUTER_CAN_HOST_LOOKUP) == ROUTER_CAN_HOST_LOOKUP
if !routerCanHostLookup && len(address) != b32Len {
Warning(TAG, "Address '%s' is not a b32 address %d.", address, len(address))
return
}
in := NewStream(make([]byte, 512))
if len(address) == b32Len {
Debug(TAG, "Lookup of b32 address detected, decode and use hash for faster lookup.")
host := address[:strings.Index(address, ".")]
in.Write([]byte(host))
out, _ = GetCryptoInstance().DecodeStream(CODEC_BASE32, in)
if out.Len() == 0 {
Warning(TAG, "Failed to decode hash of address '%s'", address)
}
}
lup = LookupEntry{address: address, session: session}
c.lookupRequestId += 1
requestId = c.lookupRequestId
c.lookupReq[requestId] = lup
if routerCanHostLookup {
if out == nil || out.Len() == 0 {
c.msgHostLookup(session, requestId, defaultTimeout, HOST_LOOKUP_TYPE_HOST, []byte(address), true)
} else {
c.msgHostLookup(session, requestId, defaultTimeout, HOST_LOOKUP_TYPE_HASH, out.Bytes(), true)
}
} else {
c.lookup[address] = requestId
c.msgDestLookup(out.Bytes(), true)
}
return requestId
}
func (c *Client) Disconnect() {
Info(TAG, "Disconnection client %p", c)
c.tcp.Disconnect()
}
func (c *Client) SetProperty(name, value string) {
if _, ok := c.properties[name]; ok {
c.properties[name] = value
switch name {
case "i2cp.tcp.host":
c.tcp.SetProperty(TCP_PROP_ADDRESS, c.properties[name])
case "i2cp.tcp.port":
c.tcp.SetProperty(TCP_PROP_PORT, c.properties[name])
case "i2cp.SSL":
c.tcp.SetProperty(TCP_PROP_USE_TLS, c.properties[name])
}
}
}
func (c *Client) IsConnected() bool {
return c.tcp.IsConnected()
}