work on reseeding

This commit is contained in:
eyedeekay
2025-05-15 20:20:46 -04:00
parent 7d51d83b40
commit ac7e369b6a
6 changed files with 156 additions and 58 deletions

View File

@ -1,6 +1,10 @@
package bootstrap package bootstrap
import "github.com/go-i2p/go-i2p/lib/common/router_info" import (
"context"
"github.com/go-i2p/go-i2p/lib/common/router_info"
)
// interface defining a way to bootstrap into the i2p network // interface defining a way to bootstrap into the i2p network
type Bootstrap interface { type Bootstrap interface {
@ -9,5 +13,5 @@ type Bootstrap interface {
// if n is 0 then try obtaining as many router infos as possible // if n is 0 then try obtaining as many router infos as possible
// returns nil and error if we cannot fetch ANY router infos // returns nil and error if we cannot fetch ANY router infos
// returns a channel that yields 1 slice of router infos containing n or fewer router infos, caller must close channel after use // returns a channel that yields 1 slice of router infos containing n or fewer router infos, caller must close channel after use
GetPeers(n int) (chan []router_info.RouterInfo, error) GetPeers(ctx context.Context, n int) ([]router_info.RouterInfo, error)
} }

View File

@ -0,0 +1,77 @@
package bootstrap
import (
"context"
"github.com/go-i2p/go-i2p/lib/common/router_info"
"github.com/go-i2p/go-i2p/lib/config"
"github.com/go-i2p/go-i2p/lib/netdb/reseed"
"github.com/go-i2p/logger"
"github.com/sirupsen/logrus"
"github.com/samber/oops"
)
var log = logger.GetGoI2PLogger()
// ReseedBootstrap implements the Bootstrap interface using HTTP reseeding
type ReseedBootstrap struct {
// Configuration containing reseed servers
config *config.BootstrapConfig
}
// NewReseedBootstrap creates a new reseeder with the provided configuration
func NewReseedBootstrap(config *config.BootstrapConfig) *ReseedBootstrap {
return &ReseedBootstrap{
config: config,
}
}
// GetPeers implements the Bootstrap interface by obtaining RouterInfos
// from configured reseed servers
func (rb *ReseedBootstrap) GetPeers(ctx context.Context, n int) ([]router_info.RouterInfo, error) {
var allRouterInfos []router_info.RouterInfo
var lastErr error
// Try each reseed server until we get enough routerInfos or exhaust all servers
for _, server := range rb.config.ReseedServers {
// Check if context is canceled before making request
if ctx.Err() != nil {
return nil, oops.Errorf("reseed canceled: %v", ctx.Err())
}
log.WithField("server", server.Url).Debug("Attempting to reseed from server")
// Use the existing Reseed implementation with a timeout context
reseeder := reseed.NewReseed()
// Perform the actual reseeding operation synchronously
serverRIs, err := reseeder.SingleReseed(server.Url)
if err != nil {
log.WithError(err).WithField("server", server.Url).Warn("Reseed attempt failed")
lastErr = oops.Errorf("reseed from %s failed: %v", server.Url, err)
continue
}
// Add the retrieved RouterInfos to our collection
allRouterInfos = append(allRouterInfos, serverRIs...)
log.WithFields(logrus.Fields{
"server": server.Url,
"count": len(serverRIs),
"total": len(allRouterInfos),
}).Info("Successfully obtained router infos from reseed server")
// Check if we have enough RouterInfos
if n > 0 && len(allRouterInfos) >= n {
break
}
}
// If we couldn't get any RouterInfos from any server, return the last error
if len(allRouterInfos) == 0 && lastErr != nil {
return nil, oops.Errorf("all reseed attempts failed: %w", lastErr)
}
return allRouterInfos, nil
}

17
lib/netdb/reseed/new.go Normal file
View File

@ -0,0 +1,17 @@
package reseed
import "net"
const (
DefaultDialTimeout = 5 * 1000 // 5 seconds
DefaultKeepAlive = 5 * 1000 // 5 seconds
)
func NewReseed() *Reseed {
return &Reseed{
Dialer: net.Dialer{
Timeout: DefaultDialTimeout,
KeepAlive: DefaultKeepAlive,
},
}
}

View File

@ -2,6 +2,7 @@ package netdb
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -17,6 +18,7 @@ import (
"github.com/go-i2p/go-i2p/lib/common/base64" "github.com/go-i2p/go-i2p/lib/common/base64"
common "github.com/go-i2p/go-i2p/lib/common/data" common "github.com/go-i2p/go-i2p/lib/common/data"
"github.com/go-i2p/go-i2p/lib/common/router_info" "github.com/go-i2p/go-i2p/lib/common/router_info"
"github.com/go-i2p/go-i2p/lib/netdb/reseed"
"github.com/go-i2p/go-i2p/lib/util" "github.com/go-i2p/go-i2p/lib/util"
) )
@ -278,8 +280,11 @@ func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) {
} }
log.Warn("NetDB size below minimum, reseed required") log.Warn("NetDB size below minimum, reseed required")
ctx, cancel := context.WithTimeout(context.Background(), reseed.DefaultDialTimeout)
defer cancel()
// Get peers from the bootstrap provider // Get peers from the bootstrap provider
peersChan, err := b.GetPeers(0) // Get as many peers as possible peersChan, err := b.GetPeers(ctx, 0) // Get as many peers as possible
if err != nil { if err != nil {
log.WithError(err).Error("Failed to get peers from bootstrap provider") log.WithError(err).Error("Failed to get peers from bootstrap provider")
return fmt.Errorf("bootstrap failed: %w", err) return fmt.Errorf("bootstrap failed: %w", err)
@ -287,16 +292,14 @@ func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) {
// Process the received peers // Process the received peers
count := 0 count := 0
for peers := range peersChan { for _, ri := range peersChan {
for _, ri := range peers { hash := ri.IdentHash()
hash := ri.IdentHash() if _, exists := db.RouterInfos[hash]; !exists {
if _, exists := db.RouterInfos[hash]; !exists { log.WithField("hash", hash).Debug("Adding new RouterInfo from reseed")
log.WithField("hash", hash).Debug("Adding new RouterInfo from reseed") db.RouterInfos[hash] = Entry{
db.RouterInfos[hash] = Entry{ RouterInfo: &ri,
RouterInfo: &ri,
}
count++
} }
count++
} }
} }

View File

@ -5,6 +5,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/go-i2p/go-i2p/lib/bootstrap"
"github.com/go-i2p/go-i2p/lib/common/base32" "github.com/go-i2p/go-i2p/lib/common/base32"
"github.com/go-i2p/go-i2p/lib/transport/ntcp" "github.com/go-i2p/go-i2p/lib/transport/ntcp"
@ -164,6 +165,18 @@ func (r *Router) mainloop() {
} else { } else {
log.Warn("Unable to determine NetDB size") log.Warn("Unable to determine NetDB size")
} }
if r.ndb.Size() < r.cfg.Bootstrap.LowPeerThreshold {
log.Info("NetDB below threshold, initiating reseed")
// Create a bootstrap instance
bootstrapper := bootstrap.NewReseedBootstrap(r.cfg.Bootstrap)
// Reseed the network database
if err := r.ndb.Reseed(bootstrapper, r.cfg.Bootstrap.LowPeerThreshold); err != nil {
log.WithError(err).Warn("Initial reseed failed, continuing with limited NetDB")
// Continue anyway, we might have some peers
}
}
if e == nil { if e == nil {
// netdb ready // netdb ready
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{

View File

@ -3,10 +3,9 @@ package noise
import ( import (
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/sirupsen/logrus"
cb "github.com/emirpasic/gods/queues/circularbuffer" cb "github.com/emirpasic/gods/queues/circularbuffer"
"github.com/flynn/noise" "github.com/flynn/noise"
"github.com/samber/oops" "github.com/samber/oops"
@ -67,33 +66,31 @@ var (
func (s *NoiseSession) LocalAddr() net.Addr { func (s *NoiseSession) LocalAddr() net.Addr {
localAddr := s.Conn.LocalAddr() localAddr := s.Conn.LocalAddr()
log.WithField("local_addr", localAddr.String()).Debug("Getting LocalAddr") log.WithField("local_addr", localAddr.String()).Debug("Getting LocalAddr")
return s.Conn.LocalAddr() return localAddr
} }
func (s *NoiseSession) Close() error { func (s *NoiseSession) Close() error {
log.Debug("Closing NoiseSession") log.Debug("Closing NoiseSession")
// Set the closed flag for atomic interlocking with Write
atomic.StoreInt32(&s.activeCall, 1)
// Clear the queues
s.SendQueue.Clear() s.SendQueue.Clear()
s.RecvQueue.Clear() s.RecvQueue.Clear()
log.Debug("SendQueue and RecvQueue cleared") log.Debug("SendQueue and RecvQueue cleared")
return nil
}
func (c *NoiseSession) processCallback(publicKey []byte, payload []byte) error { // Close the underlying TCP connection
log.WithFields(logrus.Fields{ var err error
"public_key_length": len(publicKey), if s.Conn != nil {
"payload_length": len(payload), err = s.Conn.Close()
}).Debug("Processing callback") if err != nil {
log.WithError(err).Warn("Error closing underlying connection")
} else {
log.Debug("Underlying connection closed successfully")
}
}
if c.VerifyCallback == nil {
log.Debug("VerifyCallback is nil, skipping verification")
return nil
}
err := c.VerifyCallback(publicKey, payload)
if err != nil {
log.WithError(err).Error("VerifyCallback failed")
} else {
log.Debug("VerifyCallback succeeded")
}
return err return err
} }
@ -111,38 +108,23 @@ func (s *NoiseSession) peerStaticKey() ([32]byte, error) {
return [32]byte{}, oops.Errorf("Remote static key error") return [32]byte{}, oops.Errorf("Remote static key error")
} }
func (s *NoiseSession) peerStaticIV() ([16]byte, error) {
for _, addr := range s.RouterInfo.RouterAddresses() {
transportStyle, err := addr.TransportStyle().Data()
if err != nil {
continue
}
if transportStyle == NOISE_PROTOCOL_NAME {
return addr.InitializationVector()
}
}
return [16]byte{}, oops.Errorf("Remote static IV error")
}
// newBlock allocates a new packet, from hc's free list if possible.
func newBlock() []byte {
// return make([]byte, MaxPayloadSize)
block := make([]byte, MaxPayloadSize)
log.WithField("block_size", MaxPayloadSize).Debug("Created new block")
return block
}
type VerifyCallbackFunc func(publicKey []byte, data []byte) error type VerifyCallbackFunc func(publicKey []byte, data []byte) error
func NewNoiseTransportSession(ri router_info.RouterInfo) (transport.TransportSession, error) { func NewNoiseTransportSession(ri router_info.RouterInfo) (transport.TransportSession, error) {
log.WithField("router_info", ri.String()).Debug("Creating new NoiseTransportSession") log.WithField("router_info", ri.String()).Debug("Creating new NoiseTransportSession")
// socket, err := DialNoise("noise", ri)
for _, addr := range ri.RouterAddresses() { addresses := ri.RouterAddresses()
for i, addr := range addresses {
log.WithField("address", string(addr.Bytes())).Debug("Attempting to dial") log.WithField("address", string(addr.Bytes())).Debug("Attempting to dial")
socket, err := net.Dial("tcp", string(addr.Bytes())) socket, err := net.Dial("tcp", string(addr.Bytes()))
if err != nil { if err != nil {
log.WithError(err).Error("Failed to dial address") log.WithError(err).Error("Failed to dial address")
return nil, err // Only return error if this is the last address to try
if i == len(addresses)-1 {
log.Error("Failed to create NoiseTransportSession, all addresses failed")
return nil, oops.Errorf("Transport constructor error")
}
continue
} }
session := &NoiseSession{ session := &NoiseSession{
SendQueue: cb.New(1024), SendQueue: cb.New(1024),
@ -153,8 +135,10 @@ func NewNoiseTransportSession(ri router_info.RouterInfo) (transport.TransportSes
log.WithField("local_addr", socket.LocalAddr().String()).Debug("NoiseTransportSession created successfully") log.WithField("local_addr", socket.LocalAddr().String()).Debug("NoiseTransportSession created successfully")
return session, nil return session, nil
} }
log.Error("Failed to create NoiseTransportSession, all addresses failed")
return nil, oops.Errorf("Transport constructor error") // If we get here, it means there were no addresses to try
log.Error("No addresses available to create NoiseTransportSession")
return nil, oops.Errorf("No router addresses available")
} }
func NewNoiseSession(ri router_info.RouterInfo) (*NoiseSession, error) { func NewNoiseSession(ri router_info.RouterInfo) (*NoiseSession, error) {