-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Egnyte connector #3420
Merged
Merged
Egnyte connector #3420
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
2e748c2
Egnyte connector
Weves faa4e0c
Egnyte connector more
Weves aed44ed
More stuff
Weves b459f51
Close to final
Weves 71e0a2b
Add exp time
Weves 42bb4aa
Move env variables to app_configs.py
Weves 7d51171
Address comments
Weves File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,373 @@ | ||
import io | ||
import os | ||
from collections.abc import Generator | ||
from datetime import datetime | ||
from datetime import timezone | ||
from logging import Logger | ||
from typing import Any | ||
from typing import cast | ||
from typing import IO | ||
|
||
import requests | ||
from retry import retry | ||
|
||
from danswer.configs.app_configs import EGNYTE_BASE_DOMAIN | ||
from danswer.configs.app_configs import EGNYTE_CLIENT_ID | ||
from danswer.configs.app_configs import EGNYTE_CLIENT_SECRET | ||
from danswer.configs.app_configs import EGNYTE_LOCALHOST_OVERRIDE | ||
from danswer.configs.app_configs import INDEX_BATCH_SIZE | ||
from danswer.configs.constants import DocumentSource | ||
from danswer.connectors.interfaces import GenerateDocumentsOutput | ||
from danswer.connectors.interfaces import LoadConnector | ||
from danswer.connectors.interfaces import OAuthConnector | ||
from danswer.connectors.interfaces import PollConnector | ||
from danswer.connectors.interfaces import SecondsSinceUnixEpoch | ||
from danswer.connectors.models import BasicExpertInfo | ||
from danswer.connectors.models import ConnectorMissingCredentialError | ||
from danswer.connectors.models import Document | ||
from danswer.connectors.models import Section | ||
from danswer.file_processing.extract_file_text import detect_encoding | ||
from danswer.file_processing.extract_file_text import extract_file_text | ||
from danswer.file_processing.extract_file_text import get_file_ext | ||
from danswer.file_processing.extract_file_text import is_text_file_extension | ||
from danswer.file_processing.extract_file_text import is_valid_file_ext | ||
from danswer.file_processing.extract_file_text import read_text_file | ||
from danswer.utils.logger import setup_logger | ||
|
||
|
||
logger = setup_logger() | ||
|
||
_EGNYTE_API_BASE = "https://{domain}.egnyte.com/pubapi/v1" | ||
_EGNYTE_APP_BASE = "https://{domain}.egnyte.com" | ||
_TIMEOUT = 60 | ||
|
||
|
||
def _request_with_retries( | ||
method: str, | ||
url: str, | ||
data: dict[str, Any] | None = None, | ||
headers: dict[str, Any] | None = None, | ||
params: dict[str, Any] | None = None, | ||
timeout: int = _TIMEOUT, | ||
stream: bool = False, | ||
tries: int = 8, | ||
delay: float = 1, | ||
backoff: float = 2, | ||
) -> requests.Response: | ||
@retry(tries=tries, delay=delay, backoff=backoff, logger=cast(Logger, logger)) | ||
def _make_request() -> requests.Response: | ||
response = requests.request( | ||
method, | ||
url, | ||
data=data, | ||
headers=headers, | ||
params=params, | ||
timeout=timeout, | ||
stream=stream, | ||
) | ||
response.raise_for_status() | ||
return response | ||
|
||
return _make_request() | ||
|
||
|
||
def _parse_last_modified(last_modified: str) -> datetime: | ||
return datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace( | ||
tzinfo=timezone.utc | ||
) | ||
|
||
|
||
def _process_egnyte_file( | ||
file_metadata: dict[str, Any], | ||
file_content: IO, | ||
base_url: str, | ||
folder_path: str | None = None, | ||
) -> Document | None: | ||
"""Process an Egnyte file into a Document object | ||
|
||
Args: | ||
file_data: The file data from Egnyte API | ||
file_content: The raw content of the file in bytes | ||
base_url: The base URL for the Egnyte instance | ||
folder_path: Optional folder path to filter results | ||
""" | ||
# Skip if file path doesn't match folder path filter | ||
if folder_path and not file_metadata["path"].startswith(folder_path): | ||
raise ValueError( | ||
f"File path {file_metadata['path']} does not match folder path {folder_path}" | ||
) | ||
|
||
file_name = file_metadata["name"] | ||
extension = get_file_ext(file_name) | ||
if not is_valid_file_ext(extension): | ||
logger.warning(f"Skipping file '{file_name}' with extension '{extension}'") | ||
return None | ||
|
||
# Extract text content based on file type | ||
if is_text_file_extension(file_name): | ||
encoding = detect_encoding(file_content) | ||
file_content_raw, file_metadata = read_text_file( | ||
file_content, encoding=encoding, ignore_danswer_metadata=False | ||
) | ||
else: | ||
file_content_raw = extract_file_text( | ||
file=file_content, | ||
file_name=file_name, | ||
break_on_unprocessable=True, | ||
) | ||
|
||
# Build the web URL for the file | ||
web_url = f"{base_url}/navigate/file/{file_metadata['group_id']}" | ||
|
||
# Create document metadata | ||
metadata: dict[str, str | list[str]] = { | ||
"file_path": file_metadata["path"], | ||
"last_modified": file_metadata.get("last_modified", ""), | ||
} | ||
|
||
# Add lock info if present | ||
if lock_info := file_metadata.get("lock_info"): | ||
metadata[ | ||
"lock_owner" | ||
] = f"{lock_info.get('first_name', '')} {lock_info.get('last_name', '')}" | ||
|
||
# Create the document owners | ||
primary_owner = None | ||
if uploaded_by := file_metadata.get("uploaded_by"): | ||
primary_owner = BasicExpertInfo( | ||
email=uploaded_by, # Using username as email since that's what we have | ||
) | ||
|
||
# Create the document | ||
return Document( | ||
id=f"egnyte-{file_metadata['entry_id']}", | ||
sections=[Section(text=file_content_raw.strip(), link=web_url)], | ||
source=DocumentSource.EGNYTE, | ||
semantic_identifier=file_name, | ||
metadata=metadata, | ||
doc_updated_at=( | ||
_parse_last_modified(file_metadata["last_modified"]) | ||
if "last_modified" in file_metadata | ||
else None | ||
), | ||
primary_owners=[primary_owner] if primary_owner else None, | ||
) | ||
|
||
|
||
class EgnyteConnector(LoadConnector, PollConnector, OAuthConnector): | ||
def __init__( | ||
self, | ||
folder_path: str | None = None, | ||
batch_size: int = INDEX_BATCH_SIZE, | ||
) -> None: | ||
self.domain = "" # will always be set in `load_credentials` | ||
self.folder_path = folder_path or "" # Root folder if not specified | ||
self.batch_size = batch_size | ||
self.access_token: str | None = None | ||
|
||
@classmethod | ||
def oauth_id(cls) -> DocumentSource: | ||
return DocumentSource.EGNYTE | ||
|
||
@classmethod | ||
def oauth_authorization_url(cls, base_domain: str, state: str) -> str: | ||
if not EGNYTE_CLIENT_ID: | ||
raise ValueError("EGNYTE_CLIENT_ID environment variable must be set") | ||
if not EGNYTE_BASE_DOMAIN: | ||
raise ValueError("EGNYTE_DOMAIN environment variable must be set") | ||
|
||
if EGNYTE_LOCALHOST_OVERRIDE: | ||
base_domain = EGNYTE_LOCALHOST_OVERRIDE | ||
|
||
callback_uri = f"{base_domain.strip('/')}/connector/oauth/callback/egnyte" | ||
return ( | ||
f"https://{EGNYTE_BASE_DOMAIN}.egnyte.com/puboauth/token" | ||
f"?client_id={EGNYTE_CLIENT_ID}" | ||
f"&redirect_uri={callback_uri}" | ||
f"&scope=Egnyte.filesystem" | ||
f"&state={state}" | ||
f"&response_type=code" | ||
) | ||
|
||
@classmethod | ||
def oauth_code_to_token(cls, code: str) -> dict[str, Any]: | ||
if not EGNYTE_CLIENT_ID: | ||
raise ValueError("EGNYTE_CLIENT_ID environment variable must be set") | ||
if not EGNYTE_CLIENT_SECRET: | ||
raise ValueError("EGNYTE_CLIENT_SECRET environment variable must be set") | ||
if not EGNYTE_BASE_DOMAIN: | ||
raise ValueError("EGNYTE_DOMAIN environment variable must be set") | ||
|
||
# Exchange code for token | ||
url = f"https://{EGNYTE_BASE_DOMAIN}.egnyte.com/puboauth/token" | ||
data = { | ||
"client_id": EGNYTE_CLIENT_ID, | ||
"client_secret": EGNYTE_CLIENT_SECRET, | ||
"code": code, | ||
"grant_type": "authorization_code", | ||
"redirect_uri": f"{EGNYTE_LOCALHOST_OVERRIDE or ''}/connector/oauth/callback/egnyte", | ||
"scope": "Egnyte.filesystem", | ||
} | ||
headers = {"Content-Type": "application/x-www-form-urlencoded"} | ||
|
||
response = _request_with_retries( | ||
method="POST", | ||
url=url, | ||
data=data, | ||
headers=headers, | ||
# try a lot faster since this is a realtime flow | ||
backoff=0, | ||
delay=0.1, | ||
) | ||
if not response.ok: | ||
raise RuntimeError(f"Failed to exchange code for token: {response.text}") | ||
|
||
token_data = response.json() | ||
return { | ||
"domain": EGNYTE_BASE_DOMAIN, | ||
"access_token": token_data["access_token"], | ||
} | ||
|
||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: | ||
self.domain = credentials["domain"] | ||
self.access_token = credentials["access_token"] | ||
return None | ||
|
||
def _get_files_list( | ||
self, | ||
path: str, | ||
) -> list[dict[str, Any]]: | ||
if not self.access_token or not self.domain: | ||
raise ConnectorMissingCredentialError("Egnyte") | ||
|
||
headers = { | ||
"Authorization": f"Bearer {self.access_token}", | ||
} | ||
|
||
params: dict[str, Any] = { | ||
"list_content": True, | ||
} | ||
|
||
url = f"{_EGNYTE_API_BASE.format(domain=self.domain)}/fs/{path or ''}" | ||
response = _request_with_retries( | ||
method="GET", url=url, headers=headers, params=params, timeout=_TIMEOUT | ||
) | ||
if not response.ok: | ||
raise RuntimeError(f"Failed to fetch files from Egnyte: {response.text}") | ||
|
||
data = response.json() | ||
all_files: list[dict[str, Any]] = [] | ||
|
||
# Add files from current directory | ||
all_files.extend(data.get("files", [])) | ||
|
||
# Recursively traverse folders | ||
for item in data.get("folders", []): | ||
all_files.extend(self._get_files_list(item["path"])) | ||
|
||
return all_files | ||
|
||
def _filter_files( | ||
self, | ||
files: list[dict[str, Any]], | ||
start_time: datetime | None = None, | ||
end_time: datetime | None = None, | ||
) -> list[dict[str, Any]]: | ||
filtered_files = [] | ||
for file in files: | ||
if file["is_folder"]: | ||
continue | ||
|
||
file_modified = _parse_last_modified(file["last_modified"]) | ||
if start_time and file_modified < start_time: | ||
continue | ||
if end_time and file_modified > end_time: | ||
continue | ||
|
||
filtered_files.append(file) | ||
|
||
return filtered_files | ||
|
||
def _process_files( | ||
self, | ||
start_time: datetime | None = None, | ||
end_time: datetime | None = None, | ||
) -> Generator[list[Document], None, None]: | ||
files = self._get_files_list(self.folder_path) | ||
files = self._filter_files(files, start_time, end_time) | ||
|
||
current_batch: list[Document] = [] | ||
for file in files: | ||
try: | ||
# Set up request with streaming enabled | ||
headers = { | ||
"Authorization": f"Bearer {self.access_token}", | ||
} | ||
url = f"{_EGNYTE_API_BASE.format(domain=self.domain)}/fs-content/{file['path']}" | ||
response = _request_with_retries( | ||
method="GET", | ||
url=url, | ||
headers=headers, | ||
timeout=_TIMEOUT, | ||
stream=True, | ||
) | ||
|
||
if not response.ok: | ||
logger.error( | ||
f"Failed to fetch file content: {file['path']} (status code: {response.status_code})" | ||
) | ||
continue | ||
|
||
# Stream the response content into a BytesIO buffer | ||
buffer = io.BytesIO() | ||
for chunk in response.iter_content(chunk_size=8192): | ||
if chunk: | ||
buffer.write(chunk) | ||
|
||
# Reset buffer's position to the start | ||
buffer.seek(0) | ||
|
||
# Process the streamed file content | ||
doc = _process_egnyte_file( | ||
file_metadata=file, | ||
file_content=buffer, | ||
base_url=_EGNYTE_APP_BASE.format(domain=self.domain), | ||
folder_path=self.folder_path, | ||
) | ||
|
||
if doc is not None: | ||
current_batch.append(doc) | ||
|
||
if len(current_batch) >= self.batch_size: | ||
yield current_batch | ||
current_batch = [] | ||
|
||
except Exception: | ||
logger.exception(f"Failed to process file {file['path']}") | ||
continue | ||
|
||
if current_batch: | ||
yield current_batch | ||
|
||
def load_from_state(self) -> GenerateDocumentsOutput: | ||
yield from self._process_files() | ||
|
||
def poll_source( | ||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch | ||
) -> GenerateDocumentsOutput: | ||
start_time = datetime.fromtimestamp(start, tz=timezone.utc) | ||
end_time = datetime.fromtimestamp(end, tz=timezone.utc) | ||
|
||
yield from self._process_files(start_time=start_time, end_time=end_time) | ||
|
||
|
||
if __name__ == "__main__": | ||
connector = EgnyteConnector() | ||
connector.load_credentials( | ||
{ | ||
"domain": os.environ["EGNYTE_DOMAIN"], | ||
"access_token": os.environ["EGNYTE_ACCESS_TOKEN"], | ||
} | ||
) | ||
document_batches = connector.load_from_state() | ||
print(next(document_batches)) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of possible alternative approaches to think about in the future: