refactor fetcher

This commit is contained in:
2020-07-23 14:00:21 -04:00
parent 41ea363b7f
commit cb1d9b4f99
4 changed files with 159 additions and 165 deletions

View File

@ -1,162 +1,27 @@
package fetcher package fetcher
import ( import (
"bytes"
"errors"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/kpetku/libsyndie/archive/client" "github.com/kpetku/libsyndie/archive/client"
"github.com/kpetku/libsyndie/syndieutil"
) )
const upperBoundLimit = 10000 const upperBoundLimit = 10000
// Fetcher hold things // Fetcher contains verious options for a Syndie fetch operation
type Fetcher struct { type Fetcher struct {
remote string // remote can be a URL or file remoteLocation string // remoteLocation is a URL pointing to an archive server
localPath string // localPath is where to store the results on the local filesystem localLocation string // localLocation is where to store the results on the local filesystem
timeout int // timeout in second timeout int // timeout in seconds
delay int // delay between individual fetches in miliseconds delay int // random delay of up to "delay" miliseconds between individual http requests
client *client.Client Client *client.Client
} }
// New creates a new instance of Fetcher. // New creates a new instance of Fetcher
func New(remote, path string, timeout, delay int) *Fetcher { func New(remote, path string, timeout, delay int) *Fetcher {
return &Fetcher{ return &Fetcher{
remote: remote, remoteLocation: remote,
localPath: path, localLocation: path,
timeout: timeout, timeout: timeout,
delay: delay, delay: delay,
client: &client.Client{}, Client: &client.Client{},
} }
} }
// GetIndex reaches out to an endpoint over http and builds a list of urls.
func (f *Fetcher) GetIndex() error {
_, err := url.ParseRequestURI(f.remote)
if err == nil {
req, err := http.NewRequest("GET", strings.TrimRight(f.remote, "/")+"/shared-index.dat", nil)
if err != nil {
return err
}
req.Header.Add("User-Agent", "syndied")
var c = &http.Client{
Timeout: time.Second * time.Duration(f.timeout),
}
resp, err := c.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
f.client = client.New()
f.client.Parse(resp.Body)
log.Printf("numAltURIs: %d", f.client.NumAltURIs)
log.Printf("NumChannels: %d", int(f.client.NumChannels))
log.Printf("Number of messages: %d", len(f.client.Urls))
return nil
}
fi, err := os.Stat(f.remote)
if err != nil {
return err
}
if fi.IsDir() {
fetchChannelList, _ := ioutil.ReadDir(f.remote)
for _, c := range fetchChannelList {
if c.IsDir() {
FetchFromDisk(f.remote + "/" + c.Name())
} else {
ImportFile(f.remote + "/" + c.Name())
}
}
} else {
ImportFile(f.remote)
}
return nil
}
// Fetch actually fetches all URLs from a remote endpoint into the specified path
func (f *Fetcher) Fetch() error {
f.GetIndex()
if f.client.Urls == nil {
return errors.New("no URLs to fetch")
}
if len(f.client.Urls) >= upperBoundLimit {
return errors.New("too many URLs to fetch")
}
for x, url := range f.client.Urls {
url = strings.TrimRight(f.remote, "/") + "/" + url
log.Printf("Fetching' %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Add("User-Agent", "syndied")
var c = &http.Client{
Timeout: time.Second * time.Duration(f.timeout),
}
resp, err := c.Do(req)
if err != nil {
resp.Body.Close()
return err
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
// Validate the message and take the PostURI messageID from it
outer := syndieutil.New()
_, err := outer.Unmarshal(bytes.NewReader(buf))
if err != nil {
log.Printf("Error unmarshalling outer: %s", err)
}
if outer.MessageType == "meta" {
chanHash, err := syndieutil.ChanHash(outer.Identity)
if err != nil {
log.Printf("Error parsing chanhash: %s", err)
}
log.Printf("Fetched META %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.client.Urls))
if _, err := os.Stat(f.localPath + "/" + chanHash + "/"); os.IsNotExist(err) {
os.Mkdir(f.localPath+"/"+chanHash+"/", 0744)
}
dest := f.localPath + "/" + chanHash + "/" + "meta.syndie"
werr := ioutil.WriteFile(dest, buf, 0644)
if werr != nil {
log.Printf("Unable to write post to disk: %s", werr.Error())
}
ierr := ImportFile(dest)
if ierr != nil {
log.Printf("Unable to import meta: %s", ierr.Error())
}
log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.client.Urls))
}
if outer.MessageType == "post" {
dest := f.localPath + "/" + outer.TargetChannel + "/" + strconv.Itoa(outer.PostURI.MessageID) + ".syndie"
werr := ioutil.WriteFile(dest, buf, 0644)
if werr != nil {
log.Printf("Unable to write post to disk: %s", werr.Error())
}
ierr := ImportFile(dest)
if ierr != nil {
log.Printf("Unable to import post: %s", ierr.Error())
}
log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.client.Urls))
}
}
time.Sleep(time.Millisecond * time.Duration(rand.Intn(f.delay)))
}
return nil
}

View File

@ -12,8 +12,9 @@ import (
"github.com/kpetku/syndie-core/data" "github.com/kpetku/syndie-core/data"
) )
func ImportFile(name string) error { // LocalFile opens a file from the path location and imports it into the database
dat, err := ioutil.ReadFile(name) func (f *Fetcher) LocalFile(location string) error {
dat, err := ioutil.ReadFile(location)
if err != nil { if err != nil {
return err return err
} }
@ -48,7 +49,7 @@ func ImportFile(name string) error {
if cerr != nil { if cerr != nil {
log.Printf("error in WriteChannel: %s", cerr) log.Printf("error in WriteChannel: %s", cerr)
} }
log.Printf("wrote metadata for file: %s", name) log.Printf("wrote metadata for file: %s", location)
return nil return nil
} }
if outer.MessageType == "post" { if outer.MessageType == "post" {
@ -103,21 +104,26 @@ func ImportFile(name string) error {
return nil return nil
} }
func FetchFromDisk(path string) { // LocalDir recursively walks directories of Syndie messages from the path location and imports them into the database
fi, err := os.Stat(path) func (f *Fetcher) LocalDir(location string) error {
fi, err := os.Stat(location)
if err != nil { if err != nil {
log.Printf("Error fetchin': %s", err.Error()) return err
} }
if fi.IsDir() { if fi.IsDir() {
fetchChannelList, _ := ioutil.ReadDir(path) fetchChannelList, err := ioutil.ReadDir(location)
if err != nil {
return err
}
for _, c := range fetchChannelList { for _, c := range fetchChannelList {
if c.IsDir() { if c.IsDir() {
FetchFromDisk(path + c.Name()) err = f.LocalFile(location + c.Name())
} else { } else {
ImportFile(path + "/" + c.Name()) err = f.LocalFile(location + "/" + c.Name())
} }
} }
} else { } else {
ImportFile(path) f.LocalFile(location)
} }
return err
} }

123
fetcher/remote.go Normal file
View File

@ -0,0 +1,123 @@
package fetcher
import (
"bytes"
"errors"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/kpetku/libsyndie/archive/client"
"github.com/kpetku/libsyndie/syndieutil"
)
// RemoteFetch performs a remote HTTP fetch from "remoteLocation"
func (f *Fetcher) RemoteFetch() error {
err := f.buildIndex()
if err != nil {
f.LocalDir(f.remoteLocation)
}
if f.Client.Urls == nil {
return errors.New("no URLs to fetch")
}
if len(f.Client.Urls) >= upperBoundLimit {
return errors.New("too many URLs to fetch")
}
for x, url := range f.Client.Urls {
url = strings.TrimRight(f.remoteLocation, "/") + "/" + url
log.Printf("Fetching' %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Add("User-Agent", "syndie-core")
var c = &http.Client{
Timeout: time.Second * time.Duration(f.timeout),
}
resp, err := c.Do(req)
if err != nil {
resp.Body.Close()
return err
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
// Validate the message and take the PostURI messageID from it
outer := syndieutil.New()
_, err := outer.Unmarshal(bytes.NewReader(buf))
if err != nil {
log.Printf("Error unmarshalling outer: %s", err)
}
if outer.MessageType == "meta" {
chanHash, err := syndieutil.ChanHash(outer.Identity)
if err != nil {
log.Printf("Error parsing chanhash: %s", err)
}
log.Printf("Fetched META %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.Client.Urls))
if _, err := os.Stat(f.localLocation + "/" + chanHash + "/"); os.IsNotExist(err) {
os.Mkdir(f.localLocation+"/"+chanHash+"/", 0744)
}
dest := f.localLocation + "/" + chanHash + "/" + "meta.syndie"
werr := ioutil.WriteFile(dest, buf, 0644)
if werr != nil {
log.Printf("Unable to write post to disk: %s", werr.Error())
}
err = f.LocalFile(dest)
if err != nil {
log.Printf("Unable to import meta: %s", err.Error())
}
log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.Client.Urls))
}
if outer.MessageType == "post" {
dest := f.localLocation + "/" + outer.TargetChannel + "/" + strconv.Itoa(outer.PostURI.MessageID) + ".syndie"
werr := ioutil.WriteFile(dest, buf, 0644)
if werr != nil {
log.Printf("Unable to write post to disk: %s", werr.Error())
}
ierr := f.LocalFile(dest)
if ierr != nil {
log.Printf("Unable to import post: %s", ierr.Error())
}
log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.Client.Urls))
}
}
time.Sleep(time.Millisecond * time.Duration(rand.Intn(f.delay)))
}
return nil
}
// buildIndex reaches out to an endpoint over http and builds a list of urls
func (f *Fetcher) buildIndex() error {
// Try to build the index of a remote archive over HTTP
_, err := url.ParseRequestURI(f.remoteLocation)
if err == nil {
req, err := http.NewRequest("GET", strings.TrimRight(f.remoteLocation, "/")+"/shared-index.dat", nil)
if err != nil {
return err
}
req.Header.Add("User-Agent", "syndied")
var c = &http.Client{
Timeout: time.Second * time.Duration(f.timeout),
}
resp, err := c.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
f.Client = client.New()
return f.Client.Parse(resp.Body)
}
return err
}

16
main.go
View File

@ -23,13 +23,6 @@ func main() {
flag.Parse() flag.Parse()
f := fetcher.New(*fetchURL, *fetchPath, *fetchTimeout, *fetchDelay)
ferr := f.Fetch()
if ferr != nil {
log.Printf("Error indexing: %s", ferr)
}
derr := data.OpenDB(usr.HomeDir + "/.syndie/db/bolt.db") derr := data.OpenDB(usr.HomeDir + "/.syndie/db/bolt.db")
if derr != nil { if derr != nil {
log.Fatal(err) log.Fatal(err)
@ -45,10 +38,17 @@ func main() {
log.Printf("err: %s", err) log.Printf("err: %s", err)
} }
f := fetcher.New(*fetchURL, *fetchPath, *fetchTimeout, *fetchDelay)
ferr := f.RemoteFetch()
if ferr != nil {
log.Printf("Error indexing: %s", ferr)
}
go gateway.New() go gateway.New()
time.Sleep(time.Second * 60) time.Sleep(time.Second * 60)
log.Printf("Importing messages from incoming folder to http://localhost:9090/recentmessages") log.Printf("Importing messages from incoming folder to http://localhost:9090/recentmessages")
fetcher.FetchFromDisk(usr.HomeDir + "/.syndie/incoming/") f.LocalDir(usr.HomeDir + "/.syndie/incoming/")
log.Printf("Sleeping for 5 minutes then exiting") log.Printf("Sleeping for 5 minutes then exiting")
time.Sleep(time.Minute * 5) time.Sleep(time.Minute * 5)
} }