Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proper YouTube archiving via YT-DLP #126

Merged
merged 15 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func getCMDsFlags(getCmd *cobra.Command) {
getCmd.PersistentFlags().String("es-password", "", "ElasticSearch password to use for indexing crawl logs.")
getCmd.PersistentFlags().String("es-index-prefix", "zeno", "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno`, without `-`")

// Dependencies flags
getCmd.PersistentFlags().Bool("no-ytdlp", false, "Disable youtube-dlp usage for video extraction.")
equals215 marked this conversation as resolved.
Show resolved Hide resolved
getCmd.PersistentFlags().String("ytdlp-path", "", "Path to youtube-dlp binary.")

// Alias support
// As cobra doesn't support aliases natively (couldn't find a way to do it), we have to do it manually
// This is a workaround to allow users to use `--hops` instead of `--max-hops` for example
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Config struct {
NoStdoutLogging bool `mapstructure:"no-stdout-log"`
NoHandover bool `mapstructure:"no-handover"`
NoBatchWriteWAL bool `mapstructure:"ultrasafe-queue"`

// Dependencies
NoYTDLP bool `mapstructure:"no-ytdlp"`
YTDLPPath string `mapstructure:"ytdlp-path"`
}

var (
Expand Down
108 changes: 108 additions & 0 deletions internal/pkg/crawl/assets.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,124 @@
package crawl

import (
"io"
"net/http"
"net/url"
"regexp"
"strings"
"sync/atomic"

"github.com/PuerkitoBio/goquery"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/remeh/sizedwaitgroup"
)

func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error {
var resp *http.Response

// Prepare GET request
req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil)
if err != nil {
return err
}

req.Header.Set("Referer", utils.URLToString(item.ParentURL))
req.Header.Set("User-Agent", c.UserAgent)

// Apply cookies obtained from the original URL captured
for i := range cookies {
req.AddCookie(cookies[i])
}

resp, err = c.executeGET(item, req, false)
if err != nil && err.Error() == "URL from redirection has already been seen" {
return nil
} else if err != nil {
return err
}
defer resp.Body.Close()

// needed for WARC writing
io.Copy(io.Discard, resp.Body)

return nil
}

func (c *Crawl) captureAssets(item *queue.Item, assets []*url.URL, cookies []*http.Cookie) {
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(int64(len(assets)))
swg := sizedwaitgroup.New(int(c.MaxConcurrentAssets))
excluded := false

for _, asset := range assets {
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(-1)

// Just making sure we do not over archive by archiving the original URL
if utils.URLToString(item.URL) == utils.URLToString(asset) {
continue
}

// We ban googlevideo.com URLs because they are heavily rate limited by default, and
// we don't want the crawler to spend an innapropriate amount of time archiving them
if strings.Contains(item.URL.Host, "googlevideo.com") {
continue
}

// If the URL match any excluded string, we ignore it
for _, excludedString := range c.ExcludedStrings {
if strings.Contains(utils.URLToString(asset), excludedString) {
excluded = true
break
}
}

if excluded {
excluded = false
continue
}

swg.Add()
c.URIsPerSecond.Incr(1)

go func(asset *url.URL, swg *sizedwaitgroup.SizedWaitGroup) {
defer swg.Done()

// Create the asset's item
newAsset, err := queue.NewItem(asset, item.URL, "asset", item.Hop, "", false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while creating asset item")
return
}

// Capture the asset
err = c.captureAsset(newAsset, cookies)
if err != nil {
c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while capturing asset")
return
}

// If we made it to this point, it means that the asset have been crawled successfully,
// then we can increment the locallyCrawled variable
atomic.AddUint64(&item.LocallyCrawled, 1)
}(asset, &swg)
}

swg.Wait()
}

func (c *Crawl) extractAssets(base *url.URL, item *queue.Item, doc *goquery.Document) (assets []*url.URL, err error) {
var rawAssets []string

Expand Down
120 changes: 18 additions & 102 deletions internal/pkg/crawl/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/PuerkitoBio/goquery"
Expand All @@ -20,9 +19,9 @@ import (
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/tiktok"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/truthsocial"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/vk"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/youtube"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/remeh/sizedwaitgroup"
)

func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bool) (resp *http.Response, err error) {
Expand Down Expand Up @@ -188,37 +187,6 @@ func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bo
return resp, nil
}

func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error {
var resp *http.Response

// Prepare GET request
req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil)
if err != nil {
return err
}

req.Header.Set("Referer", utils.URLToString(item.ParentURL))
req.Header.Set("User-Agent", c.UserAgent)

// Apply cookies obtained from the original URL captured
for i := range cookies {
req.AddCookie(cookies[i])
}

resp, err = c.executeGET(item, req, false)
if err != nil && err.Error() == "URL from redirection has already been seen" {
return nil
} else if err != nil {
return err
}
defer resp.Body.Close()

// needed for WARC writing
io.Copy(io.Discard, resp.Body)

return nil
}

// Capture capture the URL and return the outlinks
func (c *Crawl) Capture(item *queue.Item) error {
var (
Expand Down Expand Up @@ -370,6 +338,22 @@ func (c *Crawl) Capture(item *queue.Item) error {
}
defer resp.Body.Close()

// If it was a YouTube watch page, we potentially want to run it through the YouTube extractor
// TODO: support other watch page URLs
if strings.Contains(item.URL.Host, "youtube.com") && strings.Contains(item.URL.Path, "/watch") && !c.NoYTDLP {
URLs, err := youtube.Parse(resp.Body)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing YouTube watch page")
return err
}

if len(URLs) > 0 {
c.captureAssets(item, URLs, resp.Cookies())
}

return nil
}

// Scrape potential URLs from Link HTTP header
var (
links = Parse(resp.Header.Get("link"))
Expand Down Expand Up @@ -577,76 +561,8 @@ func (c *Crawl) Capture(item *queue.Item) error {
}
}

// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(int64(len(assets)))
swg := sizedwaitgroup.New(int(c.MaxConcurrentAssets))
excluded := false

for _, asset := range assets {
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(-1)

// Just making sure we do not over archive by archiving the original URL
if utils.URLToString(item.URL) == utils.URLToString(asset) {
continue
}

// We ban googlevideo.com URLs because they are heavily rate limited by default, and
// we don't want the crawler to spend an innapropriate amount of time archiving them
if strings.Contains(item.URL.Host, "googlevideo.com") {
continue
}

// If the URL match any excluded string, we ignore it
for _, excludedString := range c.ExcludedStrings {
if strings.Contains(utils.URLToString(asset), excludedString) {
excluded = true
break
}
}

if excluded {
excluded = false
continue
}

swg.Add()
c.URIsPerSecond.Incr(1)

go func(asset *url.URL, swg *sizedwaitgroup.SizedWaitGroup) {
defer swg.Done()

// Create the asset's item
newAsset, err := queue.NewItem(asset, item.URL, "asset", item.Hop, "", false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while creating asset item")
return
}

// Capture the asset
err = c.captureAsset(newAsset, resp.Cookies())
if err != nil {
c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while capturing asset")
return
}

// If we made it to this point, it means that the asset have been crawled successfully,
// then we can increment the locallyCrawled variable
atomic.AddUint64(&item.LocallyCrawled, 1)
}(asset, &swg)
}
c.captureAssets(item, assets, resp.Cookies())

swg.Wait()
return err
}

Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/crawl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Crawl struct {
CDXDedupeServer string
WARCFullOnDisk bool
WARCPoolSize int
WARCDedupSize int
WARCDedupeSize int
DisableLocalDedupe bool
CertValidation bool
WARCCustomCookie string
Expand All @@ -116,6 +116,10 @@ type Crawl struct {
HQProducerChannel chan *queue.Item
HQChannelsWg *sync.WaitGroup
HQRateLimitingSendBack bool

// Dependencies
NoYTDLP bool
YTDLPPath string
}

func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
Expand Down Expand Up @@ -231,7 +235,7 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
c.CertValidation = config.CertValidation
c.WARCFullOnDisk = config.WARCOnDisk
c.WARCPoolSize = config.WARCPoolSize
c.WARCDedupSize = config.WARCDedupeSize
c.WARCDedupeSize = config.WARCDedupeSize
c.WARCCustomCookie = config.CDXCookie

c.API = config.API
Expand All @@ -246,6 +250,10 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
c.PrometheusMetrics.Prefix = config.PrometheusPrefix
}

// Dependencies
c.NoYTDLP = config.NoYTDLP
c.YTDLPPath = config.YTDLPPath

if config.UserAgent != "Zeno" {
c.UserAgent = config.UserAgent
} else {
Expand Down
19 changes: 17 additions & 2 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"git.archive.org/wb/gocrawlhq"
"github.com/CorentinB/warc"
"github.com/internetarchive/Zeno/internal/pkg/crawl/dependencies/ytdlp"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/seencheck"
"github.com/internetarchive/Zeno/internal/pkg/utils"
Expand Down Expand Up @@ -67,9 +68,9 @@ func (c *Crawl) Start() (err error) {
// Init WARC rotator settings
rotatorSettings := c.initWARCRotatorSettings()

dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupSize}
dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupeSize}
if c.CDXDedupeServer != "" {
dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, CDXCookie: c.WARCCustomCookie, SizeThreshold: c.WARCDedupSize}
dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, CDXCookie: c.WARCCustomCookie, SizeThreshold: c.WARCDedupeSize}
}

// Init the HTTP client responsible for recording HTTP(s) requests / responses
Expand Down Expand Up @@ -125,6 +126,20 @@ func (c *Crawl) Start() (err error) {
go c.startAPI()
}

// Verify that dependencies exist on the system
if !c.NoYTDLP {
equals215 marked this conversation as resolved.
Show resolved Hide resolved
// If a yt-dlp path is specified, we use it,
// otherwise we try to find yt-dlp on the system
if c.YTDLPPath == "" {
path, found := ytdlp.FindPath()
if !found {
c.Log.Warn("yt-dlp not found on the system, please install it or specify the path in the configuration if you wish to use it")
} else {
c.YTDLPPath = path
}
}
}

// Parse input cookie file if specified
if c.CookieFile != "" {
cookieJar, err := cookiejar.NewFileJar(c.CookieFile, nil)
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/crawl/dependencies/ytdlp/model.go
CorentinB marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ytdlp

type Video struct {
IsLive bool `json:"is_live"`
RequestedFormats []struct {
URL string `json:"url"`
} `json:"requested_formats"`
Thumbnails []struct {
URL string `json:"url"`
} `json:"thumbnails"`
}
Loading
Loading