add rss feed scraper
This commit is contained in:
parent
e18ab0aa6e
commit
bf17977dd8
@ -13,7 +13,7 @@ import (
|
|||||||
const createFeed = `-- name: CreateFeed :one
|
const createFeed = `-- name: CreateFeed :one
|
||||||
INSERT INTO feeds (created_at, updated_at, name, url, user_id)
|
INSERT INTO feeds (created_at, updated_at, name, url, user_id)
|
||||||
VALUES (?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?)
|
||||||
RETURNING id, created_at, updated_at, name, url, user_id
|
RETURNING id, created_at, updated_at, name, url, user_id, last_fetched_at
|
||||||
`
|
`
|
||||||
|
|
||||||
type CreateFeedParams struct {
|
type CreateFeedParams struct {
|
||||||
@ -40,12 +40,13 @@ func (q *Queries) CreateFeed(ctx context.Context, arg CreateFeedParams) (Feed, e
|
|||||||
&i.Name,
|
&i.Name,
|
||||||
&i.Url,
|
&i.Url,
|
||||||
&i.UserID,
|
&i.UserID,
|
||||||
|
&i.LastFetchedAt,
|
||||||
)
|
)
|
||||||
return i, err
|
return i, err
|
||||||
}
|
}
|
||||||
|
|
||||||
const getFeeds = `-- name: GetFeeds :many
|
const getFeeds = `-- name: GetFeeds :many
|
||||||
SELECT id, created_at, updated_at, name, url, user_id FROM feeds
|
SELECT id, created_at, updated_at, name, url, user_id, last_fetched_at FROM feeds
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) {
|
func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) {
|
||||||
@ -64,6 +65,7 @@ func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) {
|
|||||||
&i.Name,
|
&i.Name,
|
||||||
&i.Url,
|
&i.Url,
|
||||||
&i.UserID,
|
&i.UserID,
|
||||||
|
&i.LastFetchedAt,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -79,7 +81,7 @@ func (q *Queries) GetFeeds(ctx context.Context) ([]Feed, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const getFeedsByUser = `-- name: GetFeedsByUser :many
|
const getFeedsByUser = `-- name: GetFeedsByUser :many
|
||||||
SELECT id, created_at, updated_at, name, url, user_id FROM feeds WHERE user_id = ?
|
SELECT id, created_at, updated_at, name, url, user_id, last_fetched_at FROM feeds WHERE user_id = ?
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, error) {
|
func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, error) {
|
||||||
@ -98,6 +100,7 @@ func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, err
|
|||||||
&i.Name,
|
&i.Name,
|
||||||
&i.Url,
|
&i.Url,
|
||||||
&i.UserID,
|
&i.UserID,
|
||||||
|
&i.LastFetchedAt,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -111,3 +114,63 @@ func (q *Queries) GetFeedsByUser(ctx context.Context, userID int64) ([]Feed, err
|
|||||||
}
|
}
|
||||||
return items, nil
|
return items, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const getNextFeedToFetch = `-- name: GetNextFeedToFetch :many
|
||||||
|
SELECT id, created_at, updated_at, name, url, user_id, last_fetched_at FROM feeds
|
||||||
|
ORDER BY last_fetched_at ASC NULLS FIRST
|
||||||
|
LIMIT ?
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) GetNextFeedToFetch(ctx context.Context, limit int64) ([]Feed, error) {
|
||||||
|
rows, err := q.db.QueryContext(ctx, getNextFeedToFetch, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var items []Feed
|
||||||
|
for rows.Next() {
|
||||||
|
var i Feed
|
||||||
|
if err := rows.Scan(
|
||||||
|
&i.ID,
|
||||||
|
&i.CreatedAt,
|
||||||
|
&i.UpdatedAt,
|
||||||
|
&i.Name,
|
||||||
|
&i.Url,
|
||||||
|
&i.UserID,
|
||||||
|
&i.LastFetchedAt,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
items = append(items, i)
|
||||||
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const markFeedAsFetched = `-- name: MarkFeedAsFetched :one
|
||||||
|
UPDATE feeds
|
||||||
|
SET last_fetched_at = date('nowj'),
|
||||||
|
updated_at = date('now')
|
||||||
|
WHERE id = ?
|
||||||
|
RETURNING id, created_at, updated_at, name, url, user_id, last_fetched_at
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) MarkFeedAsFetched(ctx context.Context, id int64) (Feed, error) {
|
||||||
|
row := q.db.QueryRowContext(ctx, markFeedAsFetched, id)
|
||||||
|
var i Feed
|
||||||
|
err := row.Scan(
|
||||||
|
&i.ID,
|
||||||
|
&i.CreatedAt,
|
||||||
|
&i.UpdatedAt,
|
||||||
|
&i.Name,
|
||||||
|
&i.Url,
|
||||||
|
&i.UserID,
|
||||||
|
&i.LastFetchedAt,
|
||||||
|
)
|
||||||
|
return i, err
|
||||||
|
}
|
||||||
|
@ -16,6 +16,7 @@ type Feed struct {
|
|||||||
Name string
|
Name string
|
||||||
Url string
|
Url string
|
||||||
UserID int64
|
UserID int64
|
||||||
|
LastFetchedAt sql.NullTime
|
||||||
}
|
}
|
||||||
|
|
||||||
type FeedFollow struct {
|
type FeedFollow struct {
|
||||||
|
6
main.go
6
main.go
@ -6,6 +6,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.rpuzonas.com/rpuzonas/go-rss-aggregator/internal/database"
|
"git.rpuzonas.com/rpuzonas/go-rss-aggregator/internal/database"
|
||||||
"github.com/go-chi/chi"
|
"github.com/go-chi/chi"
|
||||||
@ -59,7 +60,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
apiCfg := apiConfig{ DB: database.New(conn) }
|
db := database.New(conn)
|
||||||
|
apiCfg := apiConfig{ DB: db }
|
||||||
|
|
||||||
router := chi.NewRouter()
|
router := chi.NewRouter()
|
||||||
router.Use(cors.Handler(cors.Options{
|
router.Use(cors.Handler(cors.Options{
|
||||||
@ -78,6 +80,8 @@ func main() {
|
|||||||
Addr: ":" + portStr,
|
Addr: ":" + portStr,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
go startScraping(db, 10, time.Minute)
|
||||||
|
|
||||||
fmt.Printf("Listening on port: %s\n", portStr)
|
fmt.Printf("Listening on port: %s\n", portStr)
|
||||||
err = srv.ListenAndServe();
|
err = srv.ListenAndServe();
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
48
rss.go
Normal file
48
rss.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RSSFeed struct {
|
||||||
|
Channel struct {
|
||||||
|
Title string `xml:"title"`
|
||||||
|
Link string `xml:"link"`
|
||||||
|
Description string `xml:"description"`
|
||||||
|
Language string `xml:"language"`
|
||||||
|
Items []RSSItem `xml:"item"`
|
||||||
|
} `xml:"channel"`;
|
||||||
|
}
|
||||||
|
|
||||||
|
type RSSItem struct {
|
||||||
|
Title string `xml:"title"`
|
||||||
|
Link string `xml:"link"`
|
||||||
|
Description string `xml:"description"`
|
||||||
|
PubDate string `xml:"pubDate"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func urlToFeed(url string) (RSSFeed, error) {
|
||||||
|
httpClient := http.Client{ Timeout: 10 * time.Second }
|
||||||
|
|
||||||
|
resp, err := httpClient.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return RSSFeed{}, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
dat, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return RSSFeed{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rssFeed := RSSFeed{}
|
||||||
|
err = xml.Unmarshal(dat, &rssFeed)
|
||||||
|
if err != nil {
|
||||||
|
return RSSFeed{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rssFeed, nil
|
||||||
|
}
|
55
scraper.go
Normal file
55
scraper.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.rpuzonas.com/rpuzonas/go-rss-aggregator/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
func startScraping(
|
||||||
|
db *database.Queries,
|
||||||
|
concurrency int,
|
||||||
|
timeBetweenRequests time.Duration,
|
||||||
|
) {
|
||||||
|
log.Printf("Scraping on %v goroutines every %s duration", concurrency, timeBetweenRequests)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(timeBetweenRequests)
|
||||||
|
for ; ; <-ticker.C {
|
||||||
|
feeds, err := db.GetNextFeedToFetch(context.Background(), int64(concurrency))
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error fetching feeds:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for _, feed := range feeds {
|
||||||
|
wg.Add(1)
|
||||||
|
go scrapeFeed(&wg, db, feed)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func scrapeFeed(wg *sync.WaitGroup, db *database.Queries, feed database.Feed) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
_, err := db.MarkFeedAsFetched(context.Background(), feed.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error makrking feed as fetched:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rssFeed, err := urlToFeed(feed.Url)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error fetching feed:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range rssFeed.Channel.Items {
|
||||||
|
log.Println("Found post:", item.Title, "on feed", feed.Name)
|
||||||
|
}
|
||||||
|
log.Printf("Feed %s collected, %v posts found", feed.Name, len(rssFeed.Channel.Items))
|
||||||
|
}
|
@ -8,3 +8,15 @@ SELECT * FROM feeds WHERE user_id = ?;
|
|||||||
|
|
||||||
-- name: GetFeeds :many
|
-- name: GetFeeds :many
|
||||||
SELECT * FROM feeds;
|
SELECT * FROM feeds;
|
||||||
|
|
||||||
|
-- name: GetNextFeedToFetch :many
|
||||||
|
SELECT * FROM feeds
|
||||||
|
ORDER BY last_fetched_at ASC NULLS FIRST
|
||||||
|
LIMIT ?;
|
||||||
|
|
||||||
|
-- name: MarkFeedAsFetched :one
|
||||||
|
UPDATE feeds
|
||||||
|
SET last_fetched_at = date('nowj'),
|
||||||
|
updated_at = date('now')
|
||||||
|
WHERE id = ?
|
||||||
|
RETURNING *;
|
||||||
|
5
sql/schema/005_feeds_lastfetchedat.sql
Normal file
5
sql/schema/005_feeds_lastfetchedat.sql
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
-- +goose Up
|
||||||
|
ALTER TABLE feeds ADD COLUMN last_fetched_at DATE;
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
ALTER TABLE feeds DROp COLUMN last_fetched_at;
|
Loading…
Reference in New Issue
Block a user