Files
sam3/stream.go

309 lines
9.9 KiB
Go
Raw Normal View History

2015-10-15 17:21:11 -04:00
package sam3
import (
"bufio"
"bytes"
"context"
2015-10-15 17:21:11 -04:00
"errors"
"github.com/sirupsen/logrus"
2016-02-10 18:23:41 -05:00
"io"
2015-10-15 17:21:11 -04:00
"net"
"strings"
"time"
2015-10-15 17:21:11 -04:00
2024-11-09 11:54:54 -05:00
"github.com/go-i2p/i2pkeys"
)
2015-10-15 17:21:11 -04:00
// Represents a streaming session.
type StreamSession struct {
samAddr string // address to the sam bridge (ipv4:port)
id string // tunnel name
conn net.Conn // connection to sam
keys i2pkeys.I2PKeys // i2p destination keys
Timeout time.Duration
Deadline time.Time
2019-03-26 22:22:00 -04:00
sigType string
from string
to string
2019-02-13 23:58:24 -05:00
}
2024-11-02 23:26:10 -04:00
// Read reads data from the stream.
func (s *StreamSession) Read(buf []byte) (int, error) {
return s.conn.Read(buf)
}
// Write sends data over the stream.
func (s *StreamSession) Write(data []byte) (int, error) {
return s.conn.Write(data)
}
func (s *StreamSession) SetDeadline(t time.Time) error {
log.WithField("deadline", t).Debug("Setting deadline for StreamSession")
return s.conn.SetDeadline(t)
}
func (s *StreamSession) SetReadDeadline(t time.Time) error {
log.WithField("readDeadline", t).Debug("Setting read deadline for StreamSession")
return s.conn.SetReadDeadline(t)
}
func (s *StreamSession) SetWriteDeadline(t time.Time) error {
log.WithField("writeDeadline", t).Debug("Setting write deadline for StreamSession")
return s.conn.SetWriteDeadline(t)
}
2024-11-02 21:57:11 -04:00
func (s *StreamSession) From() string {
return s.from
}
2024-11-02 21:57:11 -04:00
func (s *StreamSession) To() string {
return s.to
}
2024-11-02 21:57:11 -04:00
func (s *StreamSession) SignatureType() string {
return s.sigType
2015-10-15 17:21:11 -04:00
}
// Returns the local tunnel name of the I2P tunnel used for the stream session
2024-11-02 21:57:11 -04:00
func (s *StreamSession) ID() string {
return s.id
2015-10-15 17:21:11 -04:00
}
2024-11-02 21:57:11 -04:00
func (s *StreamSession) Close() error {
log.WithField("id", s.id).Debug("Closing StreamSession")
return s.conn.Close()
2015-12-14 17:13:45 -05:00
}
2015-10-15 17:21:11 -04:00
// Returns the I2P destination (the address) of the stream session
2024-11-02 21:57:11 -04:00
func (s *StreamSession) Addr() i2pkeys.I2PAddr {
return s.keys.Addr()
2015-10-15 17:21:11 -04:00
}
2024-11-02 21:57:11 -04:00
func (s *StreamSession) LocalAddr() net.Addr {
return s.keys.Addr()
2021-04-15 20:51:07 -04:00
}
2015-10-15 17:21:11 -04:00
// Returns the keys associated with the stream session
2024-11-02 21:57:11 -04:00
func (s *StreamSession) Keys() i2pkeys.I2PKeys {
return s.keys
2015-10-15 17:21:11 -04:00
}
2016-02-10 18:23:41 -05:00
// Creates a new StreamSession with the I2CP- and streaminglib options as
2015-10-15 17:21:11 -04:00
// specified. See the I2P documentation for a full list of options.
func (sam *SAM) NewStreamSession(id string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
log.WithFields(logrus.Fields{"id": id, "options": options}).Debug("Creating new StreamSession")
2015-10-15 17:21:11 -04:00
conn, err := sam.newGenericSession("STREAM", id, keys, options, []string{})
if err != nil {
return nil, err
}
2024-10-16 17:21:44 -04:00
log.WithField("id", id).Debug("Created new StreamSession")
2019-04-10 00:47:41 -04:00
return &StreamSession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), Sig_NONE, "0", "0"}, nil
2019-02-13 23:58:24 -05:00
}
// Creates a new StreamSession with the I2CP- and streaminglib options as
// specified. See the I2P documentation for a full list of options.
func (sam *SAM) NewStreamSessionWithSignature(id string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
log.WithFields(logrus.Fields{"id": id, "options": options, "sigType": sigType}).Debug("Creating new StreamSession with signature")
2019-02-13 23:58:24 -05:00
conn, err := sam.newGenericSessionWithSignature("STREAM", id, keys, sigType, options, []string{})
if err != nil {
return nil, err
}
2024-10-16 17:21:44 -04:00
log.WithFields(logrus.Fields{"id": id, "sigType": sigType}).Debug("Created new StreamSession with signature")
2019-04-10 00:47:41 -04:00
return &StreamSession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, "0", "0"}, nil
}
// Creates a new StreamSession with the I2CP- and streaminglib options as
// specified. See the I2P documentation for a full list of options.
func (sam *SAM) NewStreamSessionWithSignatureAndPorts(id, from, to string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "options": options, "sigType": sigType}).Debug("Creating new StreamSession with signature and ports")
conn, err := sam.newGenericSessionWithSignatureAndPorts("STREAM", id, from, to, keys, sigType, options, []string{})
if err != nil {
return nil, err
}
2024-10-16 17:21:44 -04:00
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "sigType": sigType}).Debug("Created new StreamSession with signature and ports")
2019-04-10 00:47:41 -04:00
return &StreamSession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, from, to}, nil
2015-10-15 17:21:11 -04:00
}
2024-01-09 13:40:42 -05:00
// lookup name, convenience function
func (s *StreamSession) Lookup(name string) (i2pkeys.I2PAddr, error) {
log.WithField("name", name).Debug("Looking up address")
2016-02-10 18:23:41 -05:00
sam, err := NewSAM(s.samAddr)
if err == nil {
addr, err := sam.Lookup(name)
defer sam.Close()
if err != nil {
log.WithError(err).Error("Lookup failed")
} else {
log.WithField("addr", addr).Debug("Lookup successful")
}
2016-02-10 18:23:41 -05:00
return addr, err
}
log.WithError(err).Error("Failed to create SAM instance for lookup")
return i2pkeys.I2PAddr(""), err
2015-12-24 12:59:57 -05:00
}
2018-11-27 21:36:33 -05:00
// context-aware dialer, eventually...
func (s *StreamSession) DialContext(ctx context.Context, n, addr string) (net.Conn, error) {
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("DialContext called")
return s.DialContextI2P(ctx, n, addr)
}
2018-11-27 21:46:02 -05:00
// context-aware dialer, eventually...
2018-11-27 21:56:34 -05:00
func (s *StreamSession) DialContextI2P(ctx context.Context, n, addr string) (*SAMConn, error) {
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("DialContextI2P called")
if ctx == nil {
log.Panic("nil context")
panic("nil context")
}
deadline := s.deadline(ctx, time.Now())
if !deadline.IsZero() {
if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
subCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
ctx = subCtx
}
}
i2paddr, err := i2pkeys.NewI2PAddrFromString(addr)
2018-11-27 21:56:58 -05:00
if err != nil {
log.WithError(err).Error("Failed to create I2P address from string")
2018-11-27 21:56:58 -05:00
return nil, err
}
2018-11-27 21:56:34 -05:00
return s.DialI2P(i2paddr)
2018-11-27 21:46:02 -05:00
}
/*
func (s *StreamSession) Cancel() chan *StreamSession {
ch := make(chan *StreamSession)
ch <- s
return ch
}*/
func minNonzeroTime(a, b time.Time) time.Time {
if a.IsZero() {
return b
}
if b.IsZero() || a.Before(b) {
return a
}
return b
}
// deadline returns the earliest of:
// - now+Timeout
// - d.Deadline
// - the context's deadline
//
// Or zero, if none of Timeout, Deadline, or context's deadline is set.
func (s *StreamSession) deadline(ctx context.Context, now time.Time) (earliest time.Time) {
if s.Timeout != 0 { // including negative, for historical reasons
earliest = now.Add(s.Timeout)
}
if d, ok := ctx.Deadline(); ok {
earliest = minNonzeroTime(earliest, d)
}
return minNonzeroTime(earliest, s.Deadline)
}
2015-12-14 13:58:45 -05:00
// implement net.Dialer
func (s *StreamSession) Dial(n, addr string) (c net.Conn, err error) {
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("Dial called")
2015-12-14 13:58:45 -05:00
var i2paddr i2pkeys.I2PAddr
2016-02-10 18:23:41 -05:00
var host string
host, _, err = SplitHostPort(addr)
//log.Println("Dialing:", host)
if err = IgnorePortError(err); err == nil {
2016-02-10 18:23:41 -05:00
// check for name
if strings.HasSuffix(host, ".b32.i2p") || strings.HasSuffix(host, ".i2p") {
// name lookup
2016-02-10 19:16:00 -05:00
i2paddr, err = s.Lookup(host)
log.WithFields(logrus.Fields{"host": host, "i2paddr": i2paddr}).Debug("Looked up I2P address")
2016-02-10 18:23:41 -05:00
} else {
// probably a destination
i2paddr, err = i2pkeys.NewI2PAddrFromBytes([]byte(host))
//i2paddr = i2pkeys.I2PAddr(host)
//log.Println("Destination:", i2paddr, err)
log.WithFields(logrus.Fields{"host": host, "i2paddr": i2paddr}).Debug("Created I2P address from bytes")
2016-02-10 18:23:41 -05:00
}
if err == nil {
return s.DialI2P(i2paddr)
}
}
log.WithError(err).Error("Dial failed")
2016-02-10 18:23:41 -05:00
return
2015-12-14 13:58:45 -05:00
}
2015-10-15 17:21:11 -04:00
// Dials to an I2P destination and returns a SAMConn, which implements a net.Conn.
func (s *StreamSession) DialI2P(addr i2pkeys.I2PAddr) (*SAMConn, error) {
log.WithField("addr", addr).Debug("DialI2P called")
2015-10-15 17:21:11 -04:00
sam, err := NewSAM(s.samAddr)
if err != nil {
log.WithError(err).Error("Failed to create new SAM instance")
2015-10-15 17:21:11 -04:00
return nil, err
}
conn := sam.conn
2019-10-22 02:19:01 -04:00
_, err = conn.Write([]byte("STREAM CONNECT ID=" + s.id + " FROM_PORT=" + s.from + " TO_PORT=" + s.to + " DESTINATION=" + addr.Base64() + " SILENT=false\n"))
2015-10-15 17:21:11 -04:00
if err != nil {
log.WithError(err).Error("Failed to write STREAM CONNECT command")
2016-02-05 16:34:03 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, err
}
buf := make([]byte, 4096)
n, err := conn.Read(buf)
2016-02-10 17:36:15 -05:00
if err != nil && err != io.EOF {
log.WithError(err).Error("Failed to write STREAM CONNECT command")
2016-02-05 16:34:03 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, err
}
scanner := bufio.NewScanner(bytes.NewReader(buf[:n]))
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
switch scanner.Text() {
2016-02-10 18:23:41 -05:00
case "STREAM":
2015-10-15 17:21:11 -04:00
continue
2016-02-10 18:23:41 -05:00
case "STATUS":
2015-10-15 17:21:11 -04:00
continue
2016-02-10 18:23:41 -05:00
case "RESULT=OK":
log.Debug("Successfully connected to I2P destination")
return &SAMConn{s.keys.Addr(), addr, conn}, nil
2016-02-10 18:23:41 -05:00
case "RESULT=CANT_REACH_PEER":
log.Error("Can't reach peer")
2016-02-10 18:23:41 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, errors.New("Can not reach peer")
2016-02-10 18:23:41 -05:00
case "RESULT=I2P_ERROR":
log.Error("I2P internal error")
2016-02-10 18:23:41 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, errors.New("I2P internal error")
2016-02-10 18:23:41 -05:00
case "RESULT=INVALID_KEY":
log.Error("Invalid key - Stream Session")
2016-02-10 18:23:41 -05:00
conn.Close()
return nil, errors.New("Invalid key - Stream Session")
2016-02-10 18:23:41 -05:00
case "RESULT=INVALID_ID":
log.Error("Invalid tunnel ID")
2016-02-10 18:23:41 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, errors.New("Invalid tunnel ID")
2016-02-10 18:23:41 -05:00
case "RESULT=TIMEOUT":
log.Error("Connection timeout")
2016-02-10 18:23:41 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, errors.New("Timeout")
2016-02-10 18:23:41 -05:00
default:
log.WithField("error", scanner.Text()).Error("Unknown error")
2016-02-10 18:23:41 -05:00
conn.Close()
2015-10-15 17:21:11 -04:00
return nil, errors.New("Unknown error: " + scanner.Text() + " : " + string(buf[:n]))
}
}
log.Panic("Unexpected end of StreamSession.DialI2P()")
2015-10-15 17:21:11 -04:00
panic("sam3 go library error in StreamSession.DialI2P()")
}
// create a new stream listener to accept inbound connections
2015-10-15 17:21:11 -04:00
func (s *StreamSession) Listen() (*StreamListener, error) {
log.WithFields(logrus.Fields{"id": s.id, "laddr": s.keys.Addr()}).Debug("Creating new StreamListener")
2016-02-10 18:23:41 -05:00
return &StreamListener{
session: s,
id: s.id,
laddr: s.keys.Addr(),
}, nil
2015-10-15 17:21:11 -04:00
}