From f4806da65353974f3be6764def94fda0e27f3904 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Sat, 28 Dec 2024 17:53:16 -0800 Subject: [PATCH] Fix Null Value in PG (#3559) * k * k * k * k * k --- .../onyx/background/indexing/run_indexing.py | 42 ++++++++++++++++--- backend/onyx/db/models.py | 28 +++++++++++-- 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/backend/onyx/background/indexing/run_indexing.py b/backend/onyx/background/indexing/run_indexing.py index 35351de2f9f..8a7889bbb19 100644 --- a/backend/onyx/background/indexing/run_indexing.py +++ b/backend/onyx/background/indexing/run_indexing.py @@ -14,6 +14,7 @@ from onyx.configs.constants import MilestoneRecordType from onyx.connectors.connector_runner import ConnectorRunner from onyx.connectors.factory import instantiate_connector +from onyx.connectors.models import Document from onyx.connectors.models import IndexAttemptMetadata from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import get_last_successful_attempt_time @@ -90,6 +91,35 @@ def _get_connector_runner( ) +def strip_null_characters(doc_batch: list[Document]) -> list[Document]: + cleaned_batch = [] + for doc in doc_batch: + cleaned_doc = doc.model_copy() + + if "\x00" in cleaned_doc.id: + logger.warning(f"NUL characters found in document ID: {cleaned_doc.id}") + cleaned_doc.id = cleaned_doc.id.replace("\x00", "") + + if "\x00" in cleaned_doc.semantic_identifier: + logger.warning( + f"NUL characters found in document semantic identifier: {cleaned_doc.semantic_identifier}" + ) + cleaned_doc.semantic_identifier = cleaned_doc.semantic_identifier.replace( + "\x00", "" + ) + + for section in cleaned_doc.sections: + if section.link and "\x00" in section.link: + logger.warning( + f"NUL characters found in document link for document: {cleaned_doc.id}" + ) + section.link = section.link.replace("\x00", "") + + cleaned_batch.append(cleaned_doc) + + return cleaned_batch + + class ConnectorStopSignal(Exception): """A custom exception used to signal a stop in processing.""" @@ -238,7 +268,9 @@ def _run_indexing( ) batch_description = [] - for doc in doc_batch: + + doc_batch_cleaned = strip_null_characters(doc_batch) + for doc in doc_batch_cleaned: batch_description.append(doc.to_short_descriptor()) doc_size = 0 @@ -258,15 +290,15 @@ def _run_indexing( # real work happens here! new_docs, total_batch_chunks = indexing_pipeline( - document_batch=doc_batch, + document_batch=doc_batch_cleaned, index_attempt_metadata=index_attempt_md, ) batch_num += 1 net_doc_change += new_docs chunk_count += total_batch_chunks - document_count += len(doc_batch) - all_connector_doc_ids.update(doc.id for doc in doc_batch) + document_count += len(doc_batch_cleaned) + all_connector_doc_ids.update(doc.id for doc in doc_batch_cleaned) # commit transaction so that the `update` below begins # with a brand new transaction. Postgres uses the start @@ -276,7 +308,7 @@ def _run_indexing( db_session.commit() if callback: - callback.progress("_run_indexing", len(doc_batch)) + callback.progress("_run_indexing", len(doc_batch_cleaned)) # This new value is updated every batch, so UI can refresh per batch update update_docs_indexed( diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index a356e397a94..47170f93b22 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -54,6 +54,7 @@ from onyx.db.enums import IndexModelStatus from onyx.db.enums import TaskStatus from onyx.db.pydantic_type import PydanticType +from onyx.utils.logger import setup_logger from onyx.utils.special_types import JSON_ro from onyx.file_store.models import FileDescriptor from onyx.llm.override_models import LLMOverride @@ -65,6 +66,8 @@ from shared_configs.enums import EmbeddingProvider from shared_configs.enums import RerankerProvider +logger = setup_logger() + class Base(DeclarativeBase): __abstract__ = True @@ -72,6 +75,8 @@ class Base(DeclarativeBase): class EncryptedString(TypeDecorator): impl = LargeBinary + # This type's behavior is fully deterministic and doesn't depend on any external factors. + cache_ok = True def process_bind_param(self, value: str | None, dialect: Dialect) -> bytes | None: if value is not None: @@ -86,6 +91,8 @@ def process_result_value(self, value: bytes | None, dialect: Dialect) -> str | N class EncryptedJson(TypeDecorator): impl = LargeBinary + # This type's behavior is fully deterministic and doesn't depend on any external factors. + cache_ok = True def process_bind_param(self, value: dict | None, dialect: Dialect) -> bytes | None: if value is not None: @@ -102,6 +109,21 @@ def process_result_value( return value +class NullFilteredString(TypeDecorator): + impl = String + # This type's behavior is fully deterministic and doesn't depend on any external factors. + cache_ok = True + + def process_bind_param(self, value: str | None, dialect: Dialect) -> str | None: + if value is not None and "\x00" in value: + logger.warning(f"NUL characters found in value: {value}") + return value.replace("\x00", "") + return value + + def process_result_value(self, value: str | None, dialect: Dialect) -> str | None: + return value + + """ Auth/Authz (users, permissions, access) Tables """ @@ -451,16 +473,16 @@ class Document(Base): # this should correspond to the ID of the document # (as is passed around in Onyx) - id: Mapped[str] = mapped_column(String, primary_key=True) + id: Mapped[str] = mapped_column(NullFilteredString, primary_key=True) from_ingestion_api: Mapped[bool] = mapped_column( Boolean, default=False, nullable=True ) # 0 for neutral, positive for mostly endorse, negative for mostly reject boost: Mapped[int] = mapped_column(Integer, default=DEFAULT_BOOST) hidden: Mapped[bool] = mapped_column(Boolean, default=False) - semantic_id: Mapped[str] = mapped_column(String) + semantic_id: Mapped[str] = mapped_column(NullFilteredString) # First Section's link - link: Mapped[str | None] = mapped_column(String, nullable=True) + link: Mapped[str | None] = mapped_column(NullFilteredString, nullable=True) # The updated time is also used as a measure of the last successful state of the doc # pulled from the source (to help skip reindexing already updated docs in case of