diff --git a/lib/bootstrap/bootstrap.go b/lib/bootstrap/bootstrap.go index 0c60af8..68cd1e2 100644 --- a/lib/bootstrap/bootstrap.go +++ b/lib/bootstrap/bootstrap.go @@ -1,6 +1,10 @@ package bootstrap -import "github.com/go-i2p/go-i2p/lib/common/router_info" +import ( + "context" + + "github.com/go-i2p/go-i2p/lib/common/router_info" +) // interface defining a way to bootstrap into the i2p network type Bootstrap interface { @@ -9,5 +13,5 @@ type Bootstrap interface { // if n is 0 then try obtaining as many router infos as possible // returns nil and error if we cannot fetch ANY router infos // returns a channel that yields 1 slice of router infos containing n or fewer router infos, caller must close channel after use - GetPeers(n int) (chan []router_info.RouterInfo, error) + GetPeers(ctx context.Context, n int) ([]router_info.RouterInfo, error) } diff --git a/lib/bootstrap/reseed_bootstrap.go b/lib/bootstrap/reseed_bootstrap.go new file mode 100644 index 0000000..bb22f0b --- /dev/null +++ b/lib/bootstrap/reseed_bootstrap.go @@ -0,0 +1,77 @@ +package bootstrap + +import ( + "context" + + "github.com/go-i2p/go-i2p/lib/common/router_info" + "github.com/go-i2p/go-i2p/lib/config" + "github.com/go-i2p/go-i2p/lib/netdb/reseed" + "github.com/go-i2p/logger" + "github.com/sirupsen/logrus" + + "github.com/samber/oops" +) + +var log = logger.GetGoI2PLogger() + +// ReseedBootstrap implements the Bootstrap interface using HTTP reseeding +type ReseedBootstrap struct { + // Configuration containing reseed servers + config *config.BootstrapConfig +} + +// NewReseedBootstrap creates a new reseeder with the provided configuration +func NewReseedBootstrap(config *config.BootstrapConfig) *ReseedBootstrap { + return &ReseedBootstrap{ + config: config, + } +} + +// GetPeers implements the Bootstrap interface by obtaining RouterInfos +// from configured reseed servers +func (rb *ReseedBootstrap) GetPeers(ctx context.Context, n int) ([]router_info.RouterInfo, error) { + var allRouterInfos []router_info.RouterInfo + var lastErr error + + // Try each reseed server until we get enough routerInfos or exhaust all servers + for _, server := range rb.config.ReseedServers { + // Check if context is canceled before making request + if ctx.Err() != nil { + return nil, oops.Errorf("reseed canceled: %v", ctx.Err()) + } + + log.WithField("server", server.Url).Debug("Attempting to reseed from server") + + // Use the existing Reseed implementation with a timeout context + reseeder := reseed.NewReseed() + + // Perform the actual reseeding operation synchronously + serverRIs, err := reseeder.SingleReseed(server.Url) + + if err != nil { + log.WithError(err).WithField("server", server.Url).Warn("Reseed attempt failed") + lastErr = oops.Errorf("reseed from %s failed: %v", server.Url, err) + continue + } + + // Add the retrieved RouterInfos to our collection + allRouterInfos = append(allRouterInfos, serverRIs...) + log.WithFields(logrus.Fields{ + "server": server.Url, + "count": len(serverRIs), + "total": len(allRouterInfos), + }).Info("Successfully obtained router infos from reseed server") + + // Check if we have enough RouterInfos + if n > 0 && len(allRouterInfos) >= n { + break + } + } + + // If we couldn't get any RouterInfos from any server, return the last error + if len(allRouterInfos) == 0 && lastErr != nil { + return nil, oops.Errorf("all reseed attempts failed: %w", lastErr) + } + + return allRouterInfos, nil +} diff --git a/lib/netdb/reseed/new.go b/lib/netdb/reseed/new.go new file mode 100644 index 0000000..928a491 --- /dev/null +++ b/lib/netdb/reseed/new.go @@ -0,0 +1,17 @@ +package reseed + +import "net" + +const ( + DefaultDialTimeout = 5 * 1000 // 5 seconds + DefaultKeepAlive = 5 * 1000 // 5 seconds +) + +func NewReseed() *Reseed { + return &Reseed{ + Dialer: net.Dialer{ + Timeout: DefaultDialTimeout, + KeepAlive: DefaultKeepAlive, + }, + } +} diff --git a/lib/netdb/std.go b/lib/netdb/std.go index 5b70db9..d4ac43f 100644 --- a/lib/netdb/std.go +++ b/lib/netdb/std.go @@ -2,6 +2,7 @@ package netdb import ( "bytes" + "context" "fmt" "io" "os" @@ -17,6 +18,7 @@ import ( "github.com/go-i2p/go-i2p/lib/common/base64" common "github.com/go-i2p/go-i2p/lib/common/data" "github.com/go-i2p/go-i2p/lib/common/router_info" + "github.com/go-i2p/go-i2p/lib/netdb/reseed" "github.com/go-i2p/go-i2p/lib/util" ) @@ -278,8 +280,11 @@ func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) { } log.Warn("NetDB size below minimum, reseed required") + ctx, cancel := context.WithTimeout(context.Background(), reseed.DefaultDialTimeout) + defer cancel() + // Get peers from the bootstrap provider - peersChan, err := b.GetPeers(0) // Get as many peers as possible + peersChan, err := b.GetPeers(ctx, 0) // Get as many peers as possible if err != nil { log.WithError(err).Error("Failed to get peers from bootstrap provider") return fmt.Errorf("bootstrap failed: %w", err) @@ -287,16 +292,14 @@ func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) { // Process the received peers count := 0 - for peers := range peersChan { - for _, ri := range peers { - hash := ri.IdentHash() - if _, exists := db.RouterInfos[hash]; !exists { - log.WithField("hash", hash).Debug("Adding new RouterInfo from reseed") - db.RouterInfos[hash] = Entry{ - RouterInfo: &ri, - } - count++ + for _, ri := range peersChan { + hash := ri.IdentHash() + if _, exists := db.RouterInfos[hash]; !exists { + log.WithField("hash", hash).Debug("Adding new RouterInfo from reseed") + db.RouterInfos[hash] = Entry{ + RouterInfo: &ri, } + count++ } } diff --git a/lib/router/router.go b/lib/router/router.go index 954e51d..59ac786 100644 --- a/lib/router/router.go +++ b/lib/router/router.go @@ -5,6 +5,7 @@ import ( "strconv" "time" + "github.com/go-i2p/go-i2p/lib/bootstrap" "github.com/go-i2p/go-i2p/lib/common/base32" "github.com/go-i2p/go-i2p/lib/transport/ntcp" @@ -164,6 +165,18 @@ func (r *Router) mainloop() { } else { log.Warn("Unable to determine NetDB size") } + if r.ndb.Size() < r.cfg.Bootstrap.LowPeerThreshold { + log.Info("NetDB below threshold, initiating reseed") + + // Create a bootstrap instance + bootstrapper := bootstrap.NewReseedBootstrap(r.cfg.Bootstrap) + + // Reseed the network database + if err := r.ndb.Reseed(bootstrapper, r.cfg.Bootstrap.LowPeerThreshold); err != nil { + log.WithError(err).Warn("Initial reseed failed, continuing with limited NetDB") + // Continue anyway, we might have some peers + } + } if e == nil { // netdb ready log.WithFields(logrus.Fields{ diff --git a/lib/transport/noise/session.go b/lib/transport/noise/session.go index cd26589..48cd0fb 100644 --- a/lib/transport/noise/session.go +++ b/lib/transport/noise/session.go @@ -3,10 +3,9 @@ package noise import ( "net" "sync" + "sync/atomic" "time" - "github.com/sirupsen/logrus" - cb "github.com/emirpasic/gods/queues/circularbuffer" "github.com/flynn/noise" "github.com/samber/oops" @@ -67,33 +66,31 @@ var ( func (s *NoiseSession) LocalAddr() net.Addr { localAddr := s.Conn.LocalAddr() log.WithField("local_addr", localAddr.String()).Debug("Getting LocalAddr") - return s.Conn.LocalAddr() + return localAddr } func (s *NoiseSession) Close() error { log.Debug("Closing NoiseSession") + + // Set the closed flag for atomic interlocking with Write + atomic.StoreInt32(&s.activeCall, 1) + + // Clear the queues s.SendQueue.Clear() s.RecvQueue.Clear() log.Debug("SendQueue and RecvQueue cleared") - return nil -} -func (c *NoiseSession) processCallback(publicKey []byte, payload []byte) error { - log.WithFields(logrus.Fields{ - "public_key_length": len(publicKey), - "payload_length": len(payload), - }).Debug("Processing callback") + // Close the underlying TCP connection + var err error + if s.Conn != nil { + err = s.Conn.Close() + if err != nil { + log.WithError(err).Warn("Error closing underlying connection") + } else { + log.Debug("Underlying connection closed successfully") + } + } - if c.VerifyCallback == nil { - log.Debug("VerifyCallback is nil, skipping verification") - return nil - } - err := c.VerifyCallback(publicKey, payload) - if err != nil { - log.WithError(err).Error("VerifyCallback failed") - } else { - log.Debug("VerifyCallback succeeded") - } return err } @@ -111,38 +108,23 @@ func (s *NoiseSession) peerStaticKey() ([32]byte, error) { return [32]byte{}, oops.Errorf("Remote static key error") } -func (s *NoiseSession) peerStaticIV() ([16]byte, error) { - for _, addr := range s.RouterInfo.RouterAddresses() { - transportStyle, err := addr.TransportStyle().Data() - if err != nil { - continue - } - if transportStyle == NOISE_PROTOCOL_NAME { - return addr.InitializationVector() - } - } - return [16]byte{}, oops.Errorf("Remote static IV error") -} - -// newBlock allocates a new packet, from hc's free list if possible. -func newBlock() []byte { - // return make([]byte, MaxPayloadSize) - block := make([]byte, MaxPayloadSize) - log.WithField("block_size", MaxPayloadSize).Debug("Created new block") - return block -} - type VerifyCallbackFunc func(publicKey []byte, data []byte) error func NewNoiseTransportSession(ri router_info.RouterInfo) (transport.TransportSession, error) { log.WithField("router_info", ri.String()).Debug("Creating new NoiseTransportSession") - // socket, err := DialNoise("noise", ri) - for _, addr := range ri.RouterAddresses() { + + addresses := ri.RouterAddresses() + for i, addr := range addresses { log.WithField("address", string(addr.Bytes())).Debug("Attempting to dial") socket, err := net.Dial("tcp", string(addr.Bytes())) if err != nil { log.WithError(err).Error("Failed to dial address") - return nil, err + // Only return error if this is the last address to try + if i == len(addresses)-1 { + log.Error("Failed to create NoiseTransportSession, all addresses failed") + return nil, oops.Errorf("Transport constructor error") + } + continue } session := &NoiseSession{ SendQueue: cb.New(1024), @@ -153,8 +135,10 @@ func NewNoiseTransportSession(ri router_info.RouterInfo) (transport.TransportSes log.WithField("local_addr", socket.LocalAddr().String()).Debug("NoiseTransportSession created successfully") return session, nil } - log.Error("Failed to create NoiseTransportSession, all addresses failed") - return nil, oops.Errorf("Transport constructor error") + + // If we get here, it means there were no addresses to try + log.Error("No addresses available to create NoiseTransportSession") + return nil, oops.Errorf("No router addresses available") } func NewNoiseSession(ri router_info.RouterInfo) (*NoiseSession, error) {