refactor metalistener

This commit is contained in:
eyedeekay
2025-05-21 15:21:43 -04:00
parent 273f4a33fb
commit 58490c00e7
3 changed files with 65 additions and 56 deletions

View File

@ -27,7 +27,7 @@ func main() {
if err := metaListener.AddListener("tcp", tcpListener); err != nil {
log.Fatalf("Failed to add TCP listener: %v", err)
}
log.Println("Added TCP listener on 127.0.0.1:8080")
log.Println("Added TCP listener on 127.0.0.1:8082")
// Create and add a Unix socket listener (on Unix systems)
socketPath := "/tmp/example.sock"
@ -42,6 +42,7 @@ func main() {
log.Println("Added Unix socket listener on", socketPath)
}
}
log.Println("Starting http server...")
// Create a simple HTTP server using the meta listener
server := &http.Server{
@ -49,6 +50,7 @@ func main() {
fmt.Fprintf(w, "Hello from MetaListener! You connected via: %s\n", r.Proto)
}),
}
log.Println("Server is ready to accept connections...")
// Handle server shutdown gracefully
stop := make(chan os.Signal, 1)

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"net"
"sync"
"time"
@ -14,8 +15,6 @@ var (
ErrListenerClosed = errors.New("listener is closed")
// ErrNoListeners is returned when the meta listener has no active listeners
ErrNoListeners = errors.New("no active listeners")
// ErrInternalListenerFailure is returned when an internal listener fails
ErrInternalListenerFailure = errors.New("Internal listener error, shutting down metalistener for restart")
)
// MetaListener implements the net.Listener interface and manages multiple
@ -27,8 +26,6 @@ type MetaListener struct {
listenerWg sync.WaitGroup
// connCh is used to receive connections from all managed listeners
connCh chan ConnResult
// errCh is used to receive errors from all managed listeners
errCh chan error
// closeCh signals all goroutines to stop
closeCh chan struct{}
// isClosed indicates whether the meta listener has been closed
@ -47,8 +44,7 @@ type ConnResult struct {
func NewMetaListener() *MetaListener {
return &MetaListener{
listeners: make(map[string]net.Listener),
connCh: make(chan ConnResult),
errCh: make(chan error, 1), // Buffered to prevent blocking
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
closeCh: make(chan struct{}),
}
}
@ -102,58 +98,61 @@ func (ml *MetaListener) RemoveListener(id string) error {
// handleListener runs in a separate goroutine for each added listener
// and forwards accepted connections to the connCh channel.
func (ml *MetaListener) handleListener(id string, listener net.Listener) {
defer ml.listenerWg.Done()
defer func() {
log.Printf("Listener goroutine for %s exiting", id)
ml.listenerWg.Done()
}()
for {
conn, err := listener.Accept()
// First check if the MetaListener is closed
select {
case <-ml.closeCh:
// Meta listener is being closed, exit
log.Printf("MetaListener closed, stopping %s listener", id)
return
default:
// Continue processing
}
// Set a deadline for Accept to prevent blocking indefinitely
if deadline, ok := listener.(interface{ SetDeadline(time.Time) error }); ok {
deadline.SetDeadline(time.Now().Add(1 * time.Second))
}
conn, err := listener.Accept()
if err != nil {
// Check if this is a temporary error
// Check if this is a timeout error (which we expect due to our deadline)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
// Check if this is any other temporary error
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
// For temporary errors, wait a bit and try again
log.Printf("Temporary error in %s listener: %v, retrying in 100ms", id, err)
time.Sleep(100 * time.Millisecond)
continue
}
// For non-temporary errors, check if listener was closed
ml.mu.RLock()
_, stillExists := ml.listeners[id]
ml.mu.RUnlock()
if stillExists {
// Create a combined error with both the standard message and original error details
combinedErr := fmt.Errorf("%w: listener %s error - %v",
ErrInternalListenerFailure, id, err)
// Send the combined error to notify Accept() calls
select {
case ml.errCh <- combinedErr:
default:
// Don't block if no one is reading errors
}
// Then close all listeners
go ml.Close()
}
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
ml.mu.Lock()
delete(ml.listeners, id)
ml.mu.Unlock()
return
}
// Send the accepted connection to the connection channel
// If we reach here, we have a valid connection
log.Printf("Listener %s accepted connection from %s", id, conn.RemoteAddr())
// Try to forward the connection, but don't block indefinitely
select {
case ml.connCh <- ConnResult{Conn: conn, src: id}:
// Connection forwarded successfully
log.Printf("Connection from %s successfully forwarded via %s", conn.RemoteAddr(), id)
case <-ml.closeCh:
// If we're closing and got a connection, close it
log.Printf("MetaListener closing while forwarding connection, closing connection")
conn.Close()
return
case <-time.After(5 * time.Second):
// If we can't forward within 5 seconds, something is seriously wrong
log.Printf("WARNING: Connection forwarding timed out, closing connection from %s", conn.RemoteAddr())
conn.Close()
}
}
}
@ -161,26 +160,31 @@ func (ml *MetaListener) handleListener(id string, listener net.Listener) {
// Accept implements the net.Listener Accept method.
// It waits for and returns the next connection from any of the managed listeners.
func (ml *MetaListener) Accept() (net.Conn, error) {
ml.mu.RLock()
if ml.isClosed {
ml.mu.RUnlock()
return nil, ErrListenerClosed
}
for {
ml.mu.RLock()
if ml.isClosed {
ml.mu.RUnlock()
return nil, ErrListenerClosed
}
if len(ml.listeners) == 0 {
if len(ml.listeners) == 0 {
ml.mu.RUnlock()
return nil, ErrNoListeners
}
ml.mu.RUnlock()
return nil, ErrNoListeners
}
ml.mu.RUnlock()
// Wait for either a connection, an error, or close signal
select {
case result := <-ml.connCh:
return result.Conn, nil
case err := <-ml.errCh:
return nil, err
case <-ml.closeCh:
return nil, ErrListenerClosed
// Wait for either a connection or close signal
select {
case result, ok := <-ml.connCh:
if !ok {
return nil, ErrListenerClosed
}
log.Printf("Accept returning connection from %s via %s",
result.RemoteAddr(), result.src)
return result.Conn, nil
case <-ml.closeCh:
return nil, ErrListenerClosed
}
}
}
@ -194,6 +198,7 @@ func (ml *MetaListener) Close() error {
return nil
}
log.Printf("Closing MetaListener with %d listeners", len(ml.listeners))
ml.isClosed = true
// Signal all goroutines to stop
@ -203,7 +208,8 @@ func (ml *MetaListener) Close() error {
var errs []error
for id, listener := range ml.listeners {
if err := listener.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close listener %s: %w", id, err))
log.Printf("Error closing %s listener: %v", id, err)
errs = append(errs, err)
}
}
@ -211,6 +217,7 @@ func (ml *MetaListener) Close() error {
// Wait for all listener goroutines to exit
ml.listenerWg.Wait()
log.Printf("All listener goroutines have exited")
// Return combined errors if any
if len(errs) > 0 {

View File

@ -85,7 +85,7 @@ func (ml *Mirror) Accept() (net.Conn, error) {
return nil, err
}
// If the handshake is successful, get the underlying connection
conn = tlsConn.NetConn()
//conn = tlsConn.NetConn()
}
host := map[string]string{