Skip to content

Commit

Permalink
Merge pull request #1582 from arc53/scraper-2
Browse files Browse the repository at this point in the history
scraper with markdownify
  • Loading branch information
dartpain authored Jan 15, 2025
2 parents acbbf30 + 13fcbe3 commit 7218403
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 60 deletions.
2 changes: 1 addition & 1 deletion application/api/user/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2105,4 +2105,4 @@ def post(self):
except Exception as err:
return {"success": False, "error": str(err)}, 400

return {"success": True}, 200
return {"success": True}, 200
41 changes: 23 additions & 18 deletions application/parser/remote/crawler_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,46 @@
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from application.parser.remote.base import BaseRemote
from application.parser.schema.base import Document
from langchain_community.document_loaders import WebBaseLoader

class CrawlerLoader(BaseRemote):
def __init__(self, limit=10):
from langchain_community.document_loaders import WebBaseLoader
self.loader = WebBaseLoader # Initialize the document loader
self.limit = limit # Set the limit for the number of pages to scrape

def load_data(self, inputs):
url = inputs
# Check if the input is a list and if it is, use the first element
if isinstance(url, list) and url:
url = url[0]

# Check if the URL scheme is provided, if not, assume http
if not urlparse(url).scheme:
url = "http://" + url

visited_urls = set() # Keep track of URLs that have been visited
base_url = urlparse(url).scheme + "://" + urlparse(url).hostname # Extract the base URL
urls_to_visit = [url] # List of URLs to be visited, starting with the initial URL
loaded_content = [] # Store the loaded content from each URL
visited_urls = set()
base_url = urlparse(url).scheme + "://" + urlparse(url).hostname
urls_to_visit = [url]
loaded_content = []

# Continue crawling until there are no more URLs to visit
while urls_to_visit:
current_url = urls_to_visit.pop(0) # Get the next URL to visit
visited_urls.add(current_url) # Mark the URL as visited
current_url = urls_to_visit.pop(0)
visited_urls.add(current_url)

# Try to load and process the content from the current URL
try:
response = requests.get(current_url) # Fetch the content of the current URL
response.raise_for_status() # Raise an exception for HTTP errors
loader = self.loader([current_url]) # Initialize the document loader for the current URL
loaded_content.extend(loader.load()) # Load the content and add it to the loaded_content list
response = requests.get(current_url)
response.raise_for_status()
loader = self.loader([current_url])
docs = loader.load()
# Convert the loaded documents to your Document schema
for doc in docs:
loaded_content.append(
Document(
doc.page_content,
extra_info=doc.metadata
)
)
except Exception as e:
# Print an error message if loading or processing fails and continue with the next URL
print(f"Error processing URL {current_url}: {e}")
continue

Expand All @@ -45,15 +50,15 @@ def load_data(self, inputs):
all_links = [
urljoin(current_url, a['href'])
for a in soup.find_all('a', href=True)
if base_url in urljoin(current_url, a['href']) # Ensure links are from the same domain
if base_url in urljoin(current_url, a['href'])
]

# Add new links to the list of URLs to visit if they haven't been visited yet
urls_to_visit.extend([link for link in all_links if link not in visited_urls])
urls_to_visit = list(set(urls_to_visit)) # Remove duplicate URLs
urls_to_visit = list(set(urls_to_visit))

# Stop crawling if the limit of pages to scrape is reached
if self.limit is not None and len(visited_urls) >= self.limit:
break

return loaded_content # Return the loaded content from all visited URLs
return loaded_content
139 changes: 139 additions & 0 deletions application/parser/remote/crawler_markdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from application.parser.remote.base import BaseRemote
import re
from markdownify import markdownify
from application.parser.schema.base import Document
import tldextract

class CrawlerLoader(BaseRemote):
def __init__(self, limit=10, allow_subdomains=False):
"""
Given a URL crawl web pages up to `self.limit`,
convert HTML content to Markdown, and returning a list of Document objects.
:param limit: The maximum number of pages to crawl.
:param allow_subdomains: If True, crawl pages on subdomains of the base domain.
"""
self.limit = limit
self.allow_subdomains = allow_subdomains
self.session = requests.Session()

def load_data(self, inputs):
url = inputs
if isinstance(url, list) and url:
url = url[0]

# Ensure the URL has a scheme (if not, default to http)
if not urlparse(url).scheme:
url = "http://" + url

# Keep track of visited URLs to avoid revisiting the same page
visited_urls = set()

# Determine the base domain for link filtering using tldextract
base_domain = self._get_base_domain(url)
urls_to_visit = {url}
documents = []

while urls_to_visit:
current_url = urls_to_visit.pop()

# Skip if already visited
if current_url in visited_urls:
continue
visited_urls.add(current_url)

# Fetch the page content
html_content = self._fetch_page(current_url)
if html_content is None:
continue

# Convert the HTML to Markdown for cleaner text formatting
title, language, processed_markdown = self._process_html_to_markdown(html_content, current_url)
if processed_markdown:
# Create a Document for each visited page
documents.append(
Document(
processed_markdown, # content
None, # doc_id
None, # embedding
{"source": current_url, "title": title, "language": language} # extra_info
)
)

# Extract links and filter them according to domain rules
new_links = self._extract_links(html_content, current_url)
filtered_links = self._filter_links(new_links, base_domain)

# Add any new, not-yet-visited links to the queue
urls_to_visit.update(link for link in filtered_links if link not in visited_urls)

# If we've reached the limit, stop crawling
if self.limit is not None and len(visited_urls) >= self.limit:
break

return documents

def _fetch_page(self, url):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Error fetching URL {url}: {e}")
return None

def _process_html_to_markdown(self, html_content, current_url):
soup = BeautifulSoup(html_content, 'html.parser')
title_tag = soup.find('title')
title = title_tag.text.strip() if title_tag else "No Title"

# Extract language
language_tag = soup.find('html')
language = language_tag.get('lang', 'en') if language_tag else "en"

markdownified = markdownify(html_content, heading_style="ATX", newline_style="BACKSLASH")
# Reduce sequences of more than two newlines to exactly three
markdownified = re.sub(r'\n{3,}', '\n\n\n', markdownified)
return title, language, markdownified

def _extract_links(self, html_content, current_url):
soup = BeautifulSoup(html_content, 'html.parser')
links = []
for a in soup.find_all('a', href=True):
full_url = urljoin(current_url, a['href'])
links.append((full_url, a.text.strip()))
return links

def _get_base_domain(self, url):
extracted = tldextract.extract(url)
# Reconstruct the domain as domain.suffix
base_domain = f"{extracted.domain}.{extracted.suffix}"
return base_domain

def _filter_links(self, links, base_domain):
"""
Filter the extracted links to only include those that match the crawling criteria:
- If allow_subdomains is True, allow any link whose domain ends with the base_domain.
- If allow_subdomains is False, only allow exact matches of the base_domain.
"""
filtered = []
for link, _ in links:
parsed_link = urlparse(link)
if not parsed_link.netloc:
continue

extracted = tldextract.extract(parsed_link.netloc)
link_base = f"{extracted.domain}.{extracted.suffix}"

if self.allow_subdomains:
# For subdomains: sub.example.com ends with example.com
if link_base == base_domain or link_base.endswith("." + base_domain):
filtered.append(link)
else:
# Exact domain match
if link_base == base_domain:
filtered.append(link)
return filtered
4 changes: 3 additions & 1 deletion application/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ urllib3==2.3.0
vine==5.1.0
wcwidth==0.2.13
werkzeug==3.1.3
yarl==1.18.3
yarl==1.18.3
markdownify==0.14.1
tldextract==5.1.3
88 changes: 48 additions & 40 deletions application/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,53 +203,61 @@ def remote_worker(
sync_frequency="never",
operation_mode="upload",
doc_id=None,
):
):
full_path = os.path.join(directory, user, name_job)

if not os.path.exists(full_path):
os.makedirs(full_path)
self.update_state(state="PROGRESS", meta={"current": 1})
logging.info(
f"Remote job: {full_path}",
extra={"user": user, "job": name_job, "source_data": source_data},
)

remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)

chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
)
docs = chunker.chunk(documents=raw_docs)

tokens = count_tokens_docs(docs)
if operation_mode == "upload":
id = ObjectId()
embed_and_store_documents(docs, full_path, id, self)
elif operation_mode == "sync":
if not doc_id or not ObjectId.is_valid(doc_id):
raise ValueError("doc_id must be provided for sync operation.")
id = ObjectId(doc_id)
embed_and_store_documents(docs, full_path, id, self)
self.update_state(state="PROGRESS", meta={"current": 100})
self.update_state(state="PROGRESS", meta={"current": 1})
try:
logging.info("Initializing remote loader with type: %s", loader)
remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)

chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
)
docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
tokens = count_tokens_docs(docs)
logging.info("Total tokens calculated: %d", tokens)

if operation_mode == "upload":
id = ObjectId()
embed_and_store_documents(docs, full_path, id, self)
elif operation_mode == "sync":
if not doc_id or not ObjectId.is_valid(doc_id):
logging.error("Invalid doc_id provided for sync operation: %s", doc_id)
raise ValueError("doc_id must be provided for sync operation.")
id = ObjectId(doc_id)
embed_and_store_documents(docs, full_path, id, self)

self.update_state(state="PROGRESS", meta={"current": 100})

file_data = {
"name": name_job,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": loader,
"remote_data": source_data,
"sync_frequency": sync_frequency,
}
upload_index(full_path, file_data)

file_data = {
"name": name_job,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": loader,
"remote_data": source_data,
"sync_frequency": sync_frequency,
}
upload_index(full_path, file_data)
except Exception as e:
logging.error("Error in remote_worker task: %s", str(e), exc_info=True)
raise

shutil.rmtree(full_path)
finally:
if os.path.exists(full_path):
shutil.rmtree(full_path)

logging.info("remote_worker task completed successfully")
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}

def sync(
Expand Down

0 comments on commit 7218403

Please sign in to comment.