diff --git a/internal/database/feeds.sql.go b/internal/database/feeds.sql.go index a689f0d..99ad487 100644 --- a/internal/database/feeds.sql.go +++ b/internal/database/feeds.sql.go @@ -13,7 +13,7 @@ import ( const createFeed = `-- name: CreateFeed :one INSERT INTO feeds (created_at, updated_at, name, url, user_id) VALUES (?, ?, ?, ?, ?) -RETURNING id, created_at, updated_at, name, url, user_id +RETURNING id, created_at, updated_at, name, url, user_id, last_fetched_at ` type CreateFeedParams struct { @@ -40,12 +40,13 @@ func (q *Queries) CreateFeed(ctx context.Context, arg CreateFeedParams) (Feed, e &i.Name, &i.Url, &i.UserID, + &i.LastFetchedAt, ) return i, err } const getFeeds = `-- name: GetFeeds :many -SELECT id, created_at, updated_at, name, url, user_id FROM feeds +SELECT id, created_at, updated_at, name, url, user_id, last_fetched_at FROM feeds ` func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) { @@ -64,6 +65,7 @@ func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) { &i.Name, &i.Url, &i.UserID, + &i.LastFetchedAt, ); err != nil { return nil, err } @@ -79,7 +81,7 @@ func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) { } const getFeedsByUser = `-- name: GetFeedsByUser :many -SELECT id, created_at, updated_at, name, url, user_id FROM feeds WHERE user_id = ? +SELECT id, created_at, updated_at, name, url, user_id, last_fetched_at FROM feeds WHERE user_id = ? ` func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, error) { @@ -98,6 +100,7 @@ func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, err &i.Name, &i.Url, &i.UserID, + &i.LastFetchedAt, ); err != nil { return nil, err } @@ -111,3 +114,63 @@ func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, err } return items, nil } + +const getNextFeedToFetch = `-- name: GetNextFeedToFetch :many +SELECT id, created_at, updated_at, name, url, user_id, last_fetched_at FROM feeds +ORDER BY last_fetched_at ASC NULLS FIRST +LIMIT ? +` + +func (q *Queries) GetNextFeedToFetch(ctx context.Context, limit int64) ([]Feed, error) { + rows, err := q.db.QueryContext(ctx, getNextFeedToFetch, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Feed + for rows.Next() { + var i Feed + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.Name, + &i.Url, + &i.UserID, + &i.LastFetchedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const markFeedAsFetched = `-- name: MarkFeedAsFetched :one +UPDATE feeds +SET last_fetched_at = date('nowj'), +updated_at = date('now') +WHERE id = ? +RETURNING id, created_at, updated_at, name, url, user_id, last_fetched_at +` + +func (q *Queries) MarkFeedAsFetched(ctx context.Context, id int64) (Feed, error) { + row := q.db.QueryRowContext(ctx, markFeedAsFetched, id) + var i Feed + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.Name, + &i.Url, + &i.UserID, + &i.LastFetchedAt, + ) + return i, err +} diff --git a/internal/database/models.go b/internal/database/models.go index f610d2b..73face5 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -10,12 +10,13 @@ import ( ) type Feed struct { - ID int64 - CreatedAt time.Time - UpdatedAt time.Time - Name string - Url string - UserID int64 + ID int64 + CreatedAt time.Time + UpdatedAt time.Time + Name string + Url string + UserID int64 + LastFetchedAt sql.NullTime } type FeedFollow struct { diff --git a/main.go b/main.go index 5b4a45b..4ef22ca 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "os" + "time" "git.rpuzonas.com/rpuzonas/go-rss-aggregator/internal/database" "github.com/go-chi/chi" @@ -59,7 +60,8 @@ func main() { } defer conn.Close() - apiCfg := apiConfig{ DB: database.New(conn) } + db := database.New(conn) + apiCfg := apiConfig{ DB: db } router := chi.NewRouter() router.Use(cors.Handler(cors.Options{ @@ -78,6 +80,8 @@ func main() { Addr: ":" + portStr, }; + go startScraping(db, 10, time.Minute) + fmt.Printf("Listening on port: %s\n", portStr) err = srv.ListenAndServe(); if err != nil { diff --git a/rss.go b/rss.go new file mode 100644 index 0000000..d5612ca --- /dev/null +++ b/rss.go @@ -0,0 +1,48 @@ +package main + +import ( + "encoding/xml" + "io" + "net/http" + "time" +) + +type RSSFeed struct { + Channel struct { + Title string `xml:"title"` + Link string `xml:"link"` + Description string `xml:"description"` + Language string `xml:"language"` + Items []RSSItem `xml:"item"` + } `xml:"channel"`; +} + +type RSSItem struct { + Title string `xml:"title"` + Link string `xml:"link"` + Description string `xml:"description"` + PubDate string `xml:"pubDate"` +} + +func urlToFeed(url string) (RSSFeed, error) { + httpClient := http.Client{ Timeout: 10 * time.Second } + + resp, err := httpClient.Get(url) + if err != nil { + return RSSFeed{}, err + } + defer resp.Body.Close() + + dat, err := io.ReadAll(resp.Body) + if err != nil { + return RSSFeed{}, err + } + + rssFeed := RSSFeed{} + err = xml.Unmarshal(dat, &rssFeed) + if err != nil { + return RSSFeed{}, err + } + + return rssFeed, nil +} diff --git a/scraper.go b/scraper.go new file mode 100644 index 0000000..9f733d4 --- /dev/null +++ b/scraper.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "log" + "sync" + "time" + + "git.rpuzonas.com/rpuzonas/go-rss-aggregator/internal/database" +) + +func startScraping( + db *database.Queries, + concurrency int, + timeBetweenRequests time.Duration, +) { + log.Printf("Scraping on %v goroutines every %s duration", concurrency, timeBetweenRequests) + + ticker := time.NewTicker(timeBetweenRequests) + for ; ; <-ticker.C { + feeds, err := db.GetNextFeedToFetch(context.Background(), int64(concurrency)) + if err != nil { + log.Println("Error fetching feeds:", err) + continue + } + + wg := sync.WaitGroup{} + for _, feed := range feeds { + wg.Add(1) + go scrapeFeed(&wg, db, feed) + } + wg.Wait() + } +} + +func scrapeFeed(wg *sync.WaitGroup, db *database.Queries, feed database.Feed) { + defer wg.Done() + + _, err := db.MarkFeedAsFetched(context.Background(), feed.ID) + if err != nil { + log.Println("Error makrking feed as fetched:", err) + return + } + + rssFeed, err := urlToFeed(feed.Url) + if err != nil { + log.Println("Error fetching feed:", err) + return + } + + for _, item := range rssFeed.Channel.Items { + log.Println("Found post:", item.Title, "on feed", feed.Name) + } + log.Printf("Feed %s collected, %v posts found", feed.Name, len(rssFeed.Channel.Items)) +} diff --git a/sql/queries/feeds.sql b/sql/queries/feeds.sql index 47c6931..9f0455b 100644 --- a/sql/queries/feeds.sql +++ b/sql/queries/feeds.sql @@ -8,3 +8,15 @@ SELECT * FROM feeds WHERE user_id = ?; -- name: GetFeeds :many SELECT * FROM feeds; + +-- name: GetNextFeedToFetch :many +SELECT * FROM feeds +ORDER BY last_fetched_at ASC NULLS FIRST +LIMIT ?; + +-- name: MarkFeedAsFetched :one +UPDATE feeds +SET last_fetched_at = date('nowj'), +updated_at = date('now') +WHERE id = ? +RETURNING *; diff --git a/sql/schema/005_feeds_lastfetchedat.sql b/sql/schema/005_feeds_lastfetchedat.sql new file mode 100644 index 0000000..bd4c2b9 --- /dev/null +++ b/sql/schema/005_feeds_lastfetchedat.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE feeds ADD COLUMN last_fetched_at DATE; + +-- +goose Down +ALTER TABLE feeds DROp COLUMN last_fetched_at;