diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index d7c5ef5..c3c3943 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -1,162 +1,27 @@ package fetcher import ( - "bytes" - "errors" - "io/ioutil" - "log" - "math/rand" - "net/http" - "net/url" - "os" - "strconv" - "strings" - "time" - "github.com/kpetku/libsyndie/archive/client" - "github.com/kpetku/libsyndie/syndieutil" ) const upperBoundLimit = 10000 -// Fetcher hold things +// Fetcher contains verious options for a Syndie fetch operation type Fetcher struct { - remote string // remote can be a URL or file - localPath string // localPath is where to store the results on the local filesystem - timeout int // timeout in second - delay int // delay between individual fetches in miliseconds - client *client.Client + remoteLocation string // remoteLocation is a URL pointing to an archive server + localLocation string // localLocation is where to store the results on the local filesystem + timeout int // timeout in seconds + delay int // random delay of up to "delay" miliseconds between individual http requests + Client *client.Client } -// New creates a new instance of Fetcher. +// New creates a new instance of Fetcher func New(remote, path string, timeout, delay int) *Fetcher { return &Fetcher{ - remote: remote, - localPath: path, - timeout: timeout, - delay: delay, - client: &client.Client{}, + remoteLocation: remote, + localLocation: path, + timeout: timeout, + delay: delay, + Client: &client.Client{}, } } - -// GetIndex reaches out to an endpoint over http and builds a list of urls. -func (f *Fetcher) GetIndex() error { - _, err := url.ParseRequestURI(f.remote) - if err == nil { - req, err := http.NewRequest("GET", strings.TrimRight(f.remote, "/")+"/shared-index.dat", nil) - if err != nil { - return err - } - req.Header.Add("User-Agent", "syndied") - var c = &http.Client{ - Timeout: time.Second * time.Duration(f.timeout), - } - - resp, err := c.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - f.client = client.New() - f.client.Parse(resp.Body) - - log.Printf("numAltURIs: %d", f.client.NumAltURIs) - log.Printf("NumChannels: %d", int(f.client.NumChannels)) - log.Printf("Number of messages: %d", len(f.client.Urls)) - return nil - } - fi, err := os.Stat(f.remote) - if err != nil { - return err - } - if fi.IsDir() { - fetchChannelList, _ := ioutil.ReadDir(f.remote) - for _, c := range fetchChannelList { - if c.IsDir() { - FetchFromDisk(f.remote + "/" + c.Name()) - } else { - ImportFile(f.remote + "/" + c.Name()) - } - } - } else { - ImportFile(f.remote) - } - return nil -} - -// Fetch actually fetches all URLs from a remote endpoint into the specified path -func (f *Fetcher) Fetch() error { - f.GetIndex() - if f.client.Urls == nil { - return errors.New("no URLs to fetch") - } - if len(f.client.Urls) >= upperBoundLimit { - return errors.New("too many URLs to fetch") - } - for x, url := range f.client.Urls { - url = strings.TrimRight(f.remote, "/") + "/" + url - - log.Printf("Fetching' %s", url) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return err - } - req.Header.Add("User-Agent", "syndied") - var c = &http.Client{ - Timeout: time.Second * time.Duration(f.timeout), - } - resp, err := c.Do(req) - if err != nil { - resp.Body.Close() - return err - } - defer resp.Body.Close() - buf, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - if resp.StatusCode == http.StatusOK { - // Validate the message and take the PostURI messageID from it - outer := syndieutil.New() - _, err := outer.Unmarshal(bytes.NewReader(buf)) - if err != nil { - log.Printf("Error unmarshalling outer: %s", err) - } - if outer.MessageType == "meta" { - chanHash, err := syndieutil.ChanHash(outer.Identity) - if err != nil { - log.Printf("Error parsing chanhash: %s", err) - } - log.Printf("Fetched META %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.client.Urls)) - if _, err := os.Stat(f.localPath + "/" + chanHash + "/"); os.IsNotExist(err) { - os.Mkdir(f.localPath+"/"+chanHash+"/", 0744) - } - dest := f.localPath + "/" + chanHash + "/" + "meta.syndie" - werr := ioutil.WriteFile(dest, buf, 0644) - if werr != nil { - log.Printf("Unable to write post to disk: %s", werr.Error()) - } - ierr := ImportFile(dest) - if ierr != nil { - log.Printf("Unable to import meta: %s", ierr.Error()) - } - log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.client.Urls)) - } - if outer.MessageType == "post" { - dest := f.localPath + "/" + outer.TargetChannel + "/" + strconv.Itoa(outer.PostURI.MessageID) + ".syndie" - werr := ioutil.WriteFile(dest, buf, 0644) - if werr != nil { - log.Printf("Unable to write post to disk: %s", werr.Error()) - } - ierr := ImportFile(dest) - if ierr != nil { - log.Printf("Unable to import post: %s", ierr.Error()) - } - log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.client.Urls)) - } - } - time.Sleep(time.Millisecond * time.Duration(rand.Intn(f.delay))) - } - return nil -} diff --git a/fetcher/file.go b/fetcher/local.go similarity index 81% rename from fetcher/file.go rename to fetcher/local.go index 412b698..c9cd2d3 100644 --- a/fetcher/file.go +++ b/fetcher/local.go @@ -12,8 +12,9 @@ import ( "github.com/kpetku/syndie-core/data" ) -func ImportFile(name string) error { - dat, err := ioutil.ReadFile(name) +// LocalFile opens a file from the path location and imports it into the database +func (f *Fetcher) LocalFile(location string) error { + dat, err := ioutil.ReadFile(location) if err != nil { return err } @@ -48,7 +49,7 @@ func ImportFile(name string) error { if cerr != nil { log.Printf("error in WriteChannel: %s", cerr) } - log.Printf("wrote metadata for file: %s", name) + log.Printf("wrote metadata for file: %s", location) return nil } if outer.MessageType == "post" { @@ -103,21 +104,26 @@ func ImportFile(name string) error { return nil } -func FetchFromDisk(path string) { - fi, err := os.Stat(path) +// LocalDir recursively walks directories of Syndie messages from the path location and imports them into the database +func (f *Fetcher) LocalDir(location string) error { + fi, err := os.Stat(location) if err != nil { - log.Printf("Error fetchin': %s", err.Error()) + return err } if fi.IsDir() { - fetchChannelList, _ := ioutil.ReadDir(path) + fetchChannelList, err := ioutil.ReadDir(location) + if err != nil { + return err + } for _, c := range fetchChannelList { if c.IsDir() { - FetchFromDisk(path + c.Name()) + err = f.LocalFile(location + c.Name()) } else { - ImportFile(path + "/" + c.Name()) + err = f.LocalFile(location + "/" + c.Name()) } } } else { - ImportFile(path) + f.LocalFile(location) } + return err } diff --git a/fetcher/remote.go b/fetcher/remote.go new file mode 100644 index 0000000..c51bcaf --- /dev/null +++ b/fetcher/remote.go @@ -0,0 +1,123 @@ +package fetcher + +import ( + "bytes" + "errors" + "io/ioutil" + "log" + "math/rand" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "time" + + "github.com/kpetku/libsyndie/archive/client" + "github.com/kpetku/libsyndie/syndieutil" +) + +// RemoteFetch performs a remote HTTP fetch from "remoteLocation" +func (f *Fetcher) RemoteFetch() error { + err := f.buildIndex() + if err != nil { + f.LocalDir(f.remoteLocation) + } + if f.Client.Urls == nil { + return errors.New("no URLs to fetch") + } + if len(f.Client.Urls) >= upperBoundLimit { + return errors.New("too many URLs to fetch") + } + for x, url := range f.Client.Urls { + url = strings.TrimRight(f.remoteLocation, "/") + "/" + url + + log.Printf("Fetching' %s", url) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + req.Header.Add("User-Agent", "syndie-core") + var c = &http.Client{ + Timeout: time.Second * time.Duration(f.timeout), + } + resp, err := c.Do(req) + if err != nil { + resp.Body.Close() + return err + } + defer resp.Body.Close() + buf, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode == http.StatusOK { + // Validate the message and take the PostURI messageID from it + outer := syndieutil.New() + _, err := outer.Unmarshal(bytes.NewReader(buf)) + if err != nil { + log.Printf("Error unmarshalling outer: %s", err) + } + if outer.MessageType == "meta" { + chanHash, err := syndieutil.ChanHash(outer.Identity) + if err != nil { + log.Printf("Error parsing chanhash: %s", err) + } + log.Printf("Fetched META %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.Client.Urls)) + if _, err := os.Stat(f.localLocation + "/" + chanHash + "/"); os.IsNotExist(err) { + os.Mkdir(f.localLocation+"/"+chanHash+"/", 0744) + } + dest := f.localLocation + "/" + chanHash + "/" + "meta.syndie" + werr := ioutil.WriteFile(dest, buf, 0644) + if werr != nil { + log.Printf("Unable to write post to disk: %s", werr.Error()) + } + err = f.LocalFile(dest) + if err != nil { + log.Printf("Unable to import meta: %s", err.Error()) + } + log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.Client.Urls)) + } + if outer.MessageType == "post" { + dest := f.localLocation + "/" + outer.TargetChannel + "/" + strconv.Itoa(outer.PostURI.MessageID) + ".syndie" + werr := ioutil.WriteFile(dest, buf, 0644) + if werr != nil { + log.Printf("Unable to write post to disk: %s", werr.Error()) + } + ierr := f.LocalFile(dest) + if ierr != nil { + log.Printf("Unable to import post: %s", ierr.Error()) + } + log.Printf("Fetched %s with %d bytes, number: %d/%d", url, len(buf), x, len(f.Client.Urls)) + } + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(f.delay))) + } + return nil +} + +// buildIndex reaches out to an endpoint over http and builds a list of urls +func (f *Fetcher) buildIndex() error { + // Try to build the index of a remote archive over HTTP + _, err := url.ParseRequestURI(f.remoteLocation) + if err == nil { + req, err := http.NewRequest("GET", strings.TrimRight(f.remoteLocation, "/")+"/shared-index.dat", nil) + if err != nil { + return err + } + req.Header.Add("User-Agent", "syndied") + var c = &http.Client{ + Timeout: time.Second * time.Duration(f.timeout), + } + + resp, err := c.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + f.Client = client.New() + return f.Client.Parse(resp.Body) + } + return err +} diff --git a/main.go b/main.go index bcb5a97..37c4e2e 100644 --- a/main.go +++ b/main.go @@ -23,13 +23,6 @@ func main() { flag.Parse() - f := fetcher.New(*fetchURL, *fetchPath, *fetchTimeout, *fetchDelay) - - ferr := f.Fetch() - if ferr != nil { - log.Printf("Error indexing: %s", ferr) - } - derr := data.OpenDB(usr.HomeDir + "/.syndie/db/bolt.db") if derr != nil { log.Fatal(err) @@ -45,10 +38,17 @@ func main() { log.Printf("err: %s", err) } + f := fetcher.New(*fetchURL, *fetchPath, *fetchTimeout, *fetchDelay) + + ferr := f.RemoteFetch() + if ferr != nil { + log.Printf("Error indexing: %s", ferr) + } + go gateway.New() time.Sleep(time.Second * 60) log.Printf("Importing messages from incoming folder to http://localhost:9090/recentmessages") - fetcher.FetchFromDisk(usr.HomeDir + "/.syndie/incoming/") + f.LocalDir(usr.HomeDir + "/.syndie/incoming/") log.Printf("Sleeping for 5 minutes then exiting") time.Sleep(time.Minute * 5) }