Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Egnyte connector #3420

Merged
merged 7 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/danswer/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class DocumentSource(str, Enum):
NOT_APPLICABLE = "not_applicable"
FRESHDESK = "freshdesk"
FIREFLIES = "fireflies"
EGNYTE = "egnyte"


DocumentSourceRequiringTenantContext: list[DocumentSource] = [DocumentSource.FILE]
Expand Down
374 changes: 374 additions & 0 deletions backend/danswer/connectors/egnyte/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
import io
import os
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from logging import Logger
from typing import Any
from typing import cast
from typing import IO

import requests
from retry import retry

from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import OAuthConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import BasicExpertInfo
from danswer.connectors.models import ConnectorMissingCredentialError
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.file_processing.extract_file_text import check_file_ext_is_valid
from danswer.file_processing.extract_file_text import detect_encoding
from danswer.file_processing.extract_file_text import extract_file_text
from danswer.file_processing.extract_file_text import get_file_ext
from danswer.file_processing.extract_file_text import is_text_file_extension
from danswer.file_processing.extract_file_text import read_text_file
from danswer.utils.logger import setup_logger


logger = setup_logger()

_EGNYTE_LOCALHOST_OVERRIDE = os.getenv("EGNYTE_LOCALHOST_OVERRIDE")
_EGNYTE_BASE_DOMAIN = os.getenv("EGNYTE_DOMAIN")
_EGNYTE_CLIENT_ID = os.getenv("EGNYTE_CLIENT_ID")
_EGNYTE_CLIENT_SECRET = os.getenv("EGNYTE_CLIENT_SECRET")

_EGNYTE_API_BASE = "https://{domain}.egnyte.com/pubapi/v1"
_EGNYTE_APP_BASE = "https://{domain}.egnyte.com"
_TIMEOUT = 60


def _request_with_retries(
method: str,
url: str,
data: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int = _TIMEOUT,
stream: bool = False,
tries: int = 8,
delay: float = 1,
backoff: float = 2,
) -> requests.Response:
@retry(tries=tries, delay=delay, backoff=backoff, logger=cast(Logger, logger))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of possible alternative approaches to think about in the future:

  • set urllib3.Retry on the requests object
  • tenacity Retry is better supported nowadays

def _make_request() -> requests.Response:
response = requests.request(
method,
url,
data=data,
headers=headers,
params=params,
timeout=timeout,
stream=stream,
)
response.raise_for_status()
return response

return _make_request()


def _parse_last_modified(last_modified: str) -> datetime:
return datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace(
tzinfo=timezone.utc
)


def _process_egnyte_file(
file_metadata: dict[str, Any],
file_content: IO,
base_url: str,
folder_path: str | None = None,
) -> Document | None:
"""Process an Egnyte file into a Document object

Args:
file_data: The file data from Egnyte API
file_content: The raw content of the file in bytes
base_url: The base URL for the Egnyte instance
folder_path: Optional folder path to filter results
"""
# Skip if file path doesn't match folder path filter
if folder_path and not file_metadata["path"].startswith(folder_path):
raise ValueError(
f"File path {file_metadata['path']} does not match folder path {folder_path}"
)

file_name = file_metadata["name"]
extension = get_file_ext(file_name)
if not check_file_ext_is_valid(extension):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: function naming is a bit redundant ... could be is_valid_file_ext

logger.warning(f"Skipping file '{file_name}' with extension '{extension}'")
return None

# Extract text content based on file type
if is_text_file_extension(file_name):
encoding = detect_encoding(file_content)
file_content_raw, file_metadata = read_text_file(
file_content, encoding=encoding, ignore_danswer_metadata=False
)
else:
file_content_raw = extract_file_text(
file=file_content,
file_name=file_name,
break_on_unprocessable=True,
)

# Build the web URL for the file
web_url = f"{base_url}/navigate/file/{file_metadata['group_id']}"

# Create document metadata
metadata: dict[str, str | list[str]] = {
"file_path": file_metadata["path"],
"last_modified": file_metadata.get("last_modified", ""),
}

# Add lock info if present
if lock_info := file_metadata.get("lock_info"):
metadata[
"lock_owner"
] = f"{lock_info.get('first_name', '')} {lock_info.get('last_name', '')}"

# Create the document owners
primary_owner = None
if uploaded_by := file_metadata.get("uploaded_by"):
primary_owner = BasicExpertInfo(
email=uploaded_by, # Using username as email since that's what we have
)

# Create the document
return Document(
id=f"egnyte-{file_metadata['entry_id']}",
sections=[Section(text=file_content_raw.strip(), link=web_url)],
source=DocumentSource.EGNYTE,
semantic_identifier=file_name,
metadata=metadata,
doc_updated_at=(
_parse_last_modified(file_metadata["last_modified"])
if "last_modified" in file_metadata
else None
),
primary_owners=[primary_owner] if primary_owner else None,
)


class EgnyteConnector(LoadConnector, PollConnector, OAuthConnector):
def __init__(
self,
folder_path: str | None = None,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.domain = "" # will always be set in `load_credentials`
self.folder_path = folder_path or "" # Root folder if not specified
self.batch_size = batch_size
self.access_token: str | None = None

@classmethod
def oauth_id(cls) -> DocumentSource:
return DocumentSource.EGNYTE

@classmethod
def redirect_uri(cls, base_domain: str, state: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is somewhat confusingly named ... the url being generated is the authorization url for the integration.

if not _EGNYTE_CLIENT_ID:
raise ValueError("EGNYTE_CLIENT_ID environment variable must be set")
if not _EGNYTE_BASE_DOMAIN:
raise ValueError("EGNYTE_DOMAIN environment variable must be set")

if _EGNYTE_LOCALHOST_OVERRIDE:
base_domain = _EGNYTE_LOCALHOST_OVERRIDE

callback_uri = f"{base_domain.strip('/')}/connector/oauth/callback/egnyte"
return (
f"https://{_EGNYTE_BASE_DOMAIN}.egnyte.com/puboauth/token"
f"?client_id={_EGNYTE_CLIENT_ID}"
f"&redirect_uri={callback_uri}"
f"&scope=Egnyte.filesystem"
f"&state={state}"
f"&response_type=code"
)

@classmethod
def code_to_token(cls, code: str) -> dict[str, Any]:
if not _EGNYTE_CLIENT_ID:
raise ValueError("EGNYTE_CLIENT_ID environment variable must be set")
if not _EGNYTE_CLIENT_SECRET:
raise ValueError("EGNYTE_CLIENT_SECRET environment variable must be set")
if not _EGNYTE_BASE_DOMAIN:
raise ValueError("EGNYTE_DOMAIN environment variable must be set")

# Exchange code for token
url = f"https://{_EGNYTE_BASE_DOMAIN}.egnyte.com/puboauth/token"
data = {
"client_id": _EGNYTE_CLIENT_ID,
"client_secret": _EGNYTE_CLIENT_SECRET,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": f"{_EGNYTE_LOCALHOST_OVERRIDE or ''}/connector/oauth/callback/egnyte",
"scope": "Egnyte.filesystem",
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}

response = _request_with_retries(
method="POST",
url=url,
data=data,
headers=headers,
# try a lot faster since this is a realtime flow
backoff=0,
delay=0.1,
)
if not response.ok:
raise RuntimeError(f"Failed to exchange code for token: {response.text}")

token_data = response.json()
return {
"domain": _EGNYTE_BASE_DOMAIN,
"access_token": token_data["access_token"],
}

def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
self.domain = credentials["domain"]
self.access_token = credentials["access_token"]
return None

def _get_files_list(
self,
path: str,
) -> list[dict[str, Any]]:
if not self.access_token or not self.domain:
raise ConnectorMissingCredentialError("Egnyte")

headers = {
"Authorization": f"Bearer {self.access_token}",
}

params: dict[str, Any] = {
"list_content": True,
}

url = f"{_EGNYTE_API_BASE.format(domain=self.domain)}/fs/{path or ''}"
response = _request_with_retries(
method="GET", url=url, headers=headers, params=params, timeout=_TIMEOUT
)
if not response.ok:
raise RuntimeError(f"Failed to fetch files from Egnyte: {response.text}")

data = response.json()
all_files: list[dict[str, Any]] = []

# Add files from current directory
all_files.extend(data.get("files", []))

# Recursively traverse folders
for item in data.get("folders", []):
all_files.extend(self._get_files_list(item["path"]))

return all_files

def _filter_files(
self,
files: list[dict[str, Any]],
start_time: datetime | None = None,
end_time: datetime | None = None,
) -> list[dict[str, Any]]:
filtered_files = []
for file in files:
if file["is_folder"]:
continue

file_modified = _parse_last_modified(file["last_modified"])
if start_time and file_modified < start_time:
continue
if end_time and file_modified > end_time:
continue

filtered_files.append(file)

return filtered_files

def _process_files(
self,
start_time: datetime | None = None,
end_time: datetime | None = None,
) -> Generator[list[Document], None, None]:
files = self._get_files_list(self.folder_path)
files = self._filter_files(files, start_time, end_time)

current_batch: list[Document] = []
for file in files:
try:
# Set up request with streaming enabled
headers = {
"Authorization": f"Bearer {self.access_token}",
}
url = f"{_EGNYTE_API_BASE.format(domain=self.domain)}/fs-content/{file['path']}"
response = _request_with_retries(
method="GET",
url=url,
headers=headers,
timeout=_TIMEOUT,
stream=True,
)

if not response.ok:
logger.error(
f"Failed to fetch file content: {file['path']} (status code: {response.status_code})"
)
continue

# Stream the response content into a BytesIO buffer
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=8192):
if chunk:
buffer.write(chunk)

# Reset buffer's position to the start
buffer.seek(0)

# Process the streamed file content
doc = _process_egnyte_file(
file_metadata=file,
file_content=buffer,
base_url=_EGNYTE_APP_BASE.format(domain=self.domain),
folder_path=self.folder_path,
)

if doc is not None:
current_batch.append(doc)

if len(current_batch) >= self.batch_size:
yield current_batch
current_batch = []

except Exception as e:
logger.error(f"Failed to process file {file['path']}: {str(e)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.exception usually easier to use (implicitly logs exceptions)

continue

if current_batch:
yield current_batch

def load_from_state(self) -> GenerateDocumentsOutput:
yield from self._process_files()

def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
start_time = datetime.fromtimestamp(start, tz=timezone.utc)
end_time = datetime.fromtimestamp(end, tz=timezone.utc)

yield from self._process_files(start_time=start_time, end_time=end_time)


if __name__ == "__main__":
connector = EgnyteConnector()
connector.load_credentials(
{
"domain": os.environ["EGNYTE_DOMAIN"],
"access_token": os.environ["EGNYTE_ACCESS_TOKEN"],
}
)
document_batches = connector.load_from_state()
print(next(document_batches))
2 changes: 2 additions & 0 deletions backend/danswer/connectors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from danswer.connectors.discourse.connector import DiscourseConnector
from danswer.connectors.document360.connector import Document360Connector
from danswer.connectors.dropbox.connector import DropboxConnector
from danswer.connectors.egnyte.connector import EgnyteConnector
from danswer.connectors.file.connector import LocalFileConnector
from danswer.connectors.fireflies.connector import FirefliesConnector
from danswer.connectors.freshdesk.connector import FreshdeskConnector
Expand Down Expand Up @@ -103,6 +104,7 @@ def identify_connector_class(
DocumentSource.XENFORO: XenforoConnector,
DocumentSource.FRESHDESK: FreshdeskConnector,
DocumentSource.FIREFLIES: FirefliesConnector,
DocumentSource.EGNYTE: EgnyteConnector,
}
connector_by_source = connector_map.get(source, {})

Expand Down
Loading
Loading