Skip to content

Commit

Permalink
feat: implement #277
Browse files Browse the repository at this point in the history
  • Loading branch information
majsan committed Nov 15, 2024
1 parent 6124dea commit 180beee
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 137 deletions.
25 changes: 25 additions & 0 deletions karp/api/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import logging
import threading
import time
import traceback
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from typing import Any

from asgi_correlation_id import CorrelationIdMiddleware
Expand All @@ -11,6 +15,7 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

from karp import main
Expand All @@ -20,6 +25,7 @@
from karp.lex.domain import errors as lex_errors
from karp.main import config, new_session
from karp.main.errors import ClientErrorCodes
from karp.search import index_worker

querying_description = """
## Query DSL
Expand Down Expand Up @@ -94,6 +100,24 @@
def create_app() -> FastAPI:
app_context = main.bootstrap_app()

@asynccontextmanager
async def lifespan(_):
def handle_task_result(done_task: asyncio.Task):
try:
done_task.result()
except Exception as e:
logger.error(f"Indexing job failed, {e}")

if es_conf := app_context.settings["elasticsearch_host"]:
# Don't remove saving a reference to the task - it might not complete unless it is saved due to GC
engine = app_context.injector.get(Engine)
task = asyncio.create_task(index_worker.start(es_conf, engine))
task.add_done_callback(handle_task_result)

yield

task.cancel()

app = FastAPI(
title=f"{config.PROJECT_NAME} API",
description="""Karp is Språkbanken's tool for editing structural data.\n\nThe
Expand All @@ -103,6 +127,7 @@ def create_app() -> FastAPI:
docs_url=None,
version=config.VERSION,
openapi_tags=tags_metadata,
lifespan=lifespan,
)

app.state.app_context = app_context
Expand Down
2 changes: 1 addition & 1 deletion karp/cliapp/subapps/entries_subapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def batch_entries(
> Example:
> `[{"cmd": {"cmdtype": "add_entry","resource_id": "resource_a","entry": {"baseform": "sko"},"message": "add sko","user": "[email protected]"}}]`
> `[{"cmdtype": "add_entry","resource_id": "resource_a","entry": {"baseform": "sko"},"message": "add sko","user": "[email protected]"}]`
"""
logger.info("run entries command in batch")
entry_commands = inject_from_ctx(EntryCommands, ctx) # type: ignore[type-abstract]
Expand Down
78 changes: 6 additions & 72 deletions karp/entry_commands.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
from collections import defaultdict
from typing import Any, Dict, Generator, Iterable, List, Tuple
from typing import Dict, Iterable, Tuple

import sqlalchemy
from injector import inject
from sqlalchemy.orm import Session

from karp import plugins
from karp.foundation.timings import utc_now
from karp.foundation.value_objects import UniqueId, unique_id
from karp.lex import EntryDto
from karp.lex.domain import errors
from karp.lex.domain.entities import Resource
from karp.lex.domain.errors import EntryNotFound, ResourceNotFound
from karp.lex.infrastructure import EntryRepository, ResourceRepository
from karp.plugins import Plugins
from karp.search.domain.index_entry import IndexEntry
from karp.search.infrastructure.es.indices import EsIndex
from karp.search.infrastructure.transformers import entry_transformer


class EntryCommands:
Expand All @@ -25,15 +19,9 @@ def __init__(
self,
session: Session,
resources: ResourceRepository,
index: EsIndex,
plugins: Plugins,
):
self.session = session
self.resources: ResourceRepository = resources
self.index = index
self.plugins = plugins
self.added_entries = defaultdict(list)
self.deleted_entries = defaultdict(list)
self.in_transaction = False

def _get_resource(self, resource_id: str) -> Resource:
Expand All @@ -48,14 +36,6 @@ def _get_entries(self, resource_id: str) -> EntryRepository:
raise ResourceNotFound(resource_id)
return result

def _transform(self, config, entry: EntryDto) -> IndexEntry:
return next(self._transform_entries(config, [entry]))

def _transform_entries(self, config, entries: Iterable[EntryDto]) -> Generator[IndexEntry, Any, None]:
config = plugins.transform_config(self.plugins, config)
entries = plugins.transform_entries(self.plugins, config, entries)
return (entry_transformer.transform(config, entry) for entry in entries)

def add_entries_in_chunks(
self,
resource_id: str,
Expand All @@ -64,7 +44,7 @@ def add_entries_in_chunks(
user: str,
message: str,
timestamp: float | None = None,
) -> list[EntryDto]:
):
"""
Add entries to DB and INDEX (if present and resource is active).
Expand All @@ -75,16 +55,8 @@ def add_entries_in_chunks(
KarpError
- If an entry fails to be validated against the json schema.
- If the DB interaction fails.
Returns
-------
List
List of the id's of the created entries.
"""

created_db_entries = []
resource = self._get_resource(resource_id)

entry_table = self._get_entries(resource_id)
for i, (entry_id, entry_raw) in enumerate(entries):
entry = resource.create_entry_from_dict(
Expand All @@ -95,17 +67,10 @@ def add_entries_in_chunks(
timestamp=timestamp,
)
entry_table.save(entry)
created_db_entries.append(EntryDto.from_entry(entry))

if chunk_size > 0 and i % chunk_size == 0:
self.added_entries[resource_id].extend(created_db_entries)
self._commit()

self.added_entries[resource_id].extend(created_db_entries)
self._commit()

return created_db_entries

def add_entries(
self,
resource_id: str,
Expand All @@ -114,10 +79,10 @@ def add_entries(
message: str,
timestamp: float | None = None,
):
return self.add_entries_in_chunks(resource_id, 0, entries, user, message, timestamp=timestamp)
self.add_entries_in_chunks(resource_id, 0, entries, user, message, timestamp=timestamp)

def import_entries(self, resource_id, entries, user, message):
return self.import_entries_in_chunks(resource_id, 0, entries, user, message)
self.import_entries_in_chunks(resource_id, 0, entries, user, message)

def import_entries_in_chunks(self, resource_id, chunk_size, entries, user, message):
"""
Expand All @@ -130,14 +95,7 @@ def import_entries_in_chunks(self, resource_id, chunk_size, entries, user, messa
KarpError
- If an entry fails to be validated against the json schema.
- If the DB interaction fails.
Returns
-------
List
List of the id's of the created entries.
"""

created_db_entries = []
resource = self._get_resource(resource_id)
entry_table = self._get_entries(resource_id)

Expand All @@ -151,21 +109,12 @@ def import_entries_in_chunks(self, resource_id, chunk_size, entries, user, messa
)
entry_table.save(entry)

created_db_entries.append(EntryDto.from_entry(entry))

if chunk_size > 0 and i % chunk_size == 0:
self.added_entries[resource_id].extend(created_db_entries)
self._commit()

self.added_entries[resource_id].extend(created_db_entries)
self._commit()

return created_db_entries

def add_entry(self, resource_id, entry_id, entry, user, message, timestamp=None):
result = self.add_entries(resource_id, [(entry_id, entry)], user, message, timestamp=timestamp)
assert len(result) == 1 # noqa: S101
return result[0]
self.add_entries(resource_id, [(entry_id, entry)], user, message, timestamp=timestamp)

def update_entry(self, resource_id, _id, version, user, message, entry, timestamp=None):
resource = self._get_resource(resource_id)
Expand All @@ -188,7 +137,6 @@ def update_entry(self, resource_id, _id, version, user, message, entry, timestam
)
if version != current_db_entry.version:
entries.save(current_db_entry)
self.added_entries[resource_id].append(EntryDto.from_entry(current_db_entry))
self._commit()

return EntryDto.from_entry(current_db_entry)
Expand All @@ -206,31 +154,19 @@ def delete_entry(self, resource_id, _id, user, version, message="Entry deleted",
timestamp=timestamp or utc_now(),
)
entries.save(entry)
self.deleted_entries[resource_id].append(entry.id)
self._commit()

def _commit(self):
"""Commits the session and updates ES, but not if in a transaction."""
"""Commits the session, but not if in a transaction."""

if self.in_transaction:
return

try:
self.session.commit()
except sqlalchemy.exc.IntegrityError as e:
self.session.rollback()
raise errors.IntegrityError(str(e)) from None

for resource_id, entry_dtos in self.added_entries.items():
resource = self._get_resource(resource_id)
index_entries = self._transform_entries(resource.config, entry_dtos)
self.index.add_entries(resource.resource_id, index_entries)
self.added_entries.clear()

for resource_id, entry_ids in self.deleted_entries.items():
self.index.delete_entries(resource_id, entry_ids=entry_ids)
self.deleted_entries.clear()

def start_transaction(self):
"""Runs commands inside a transaction. Nothing will be
committed until the transaction ends."""
Expand All @@ -256,5 +192,3 @@ def rollback(self):
raise RuntimeError("Can't roll back when outside a transaction")

self.in_transaction = True
self.added_entries.clear()
self.deleted_entries.clear()
6 changes: 6 additions & 0 deletions karp/lex/infrastructure/sql/entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from karp.lex.domain.entities.entry import Entry

from . import models
from .models import IndexJob

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,6 +52,11 @@ def drop_table(self):
def _save(self, entry: Entry):
entry_dto = self.history_model.from_entity(entry)
self._session.add(entry_dto)
# TODO add comment here
if entry_dto.discarded:
self._session.add(IndexJob(resource_id=entry.resource_id, entry_id=entry.id, op="DELETE"))
else:
self._session.add(IndexJob(resource_id=entry.resource_id, entry_id=entry.id, op="ADD", body=entry.body))

def entity_ids(self) -> List[str]:
stmt = self._stmt_latest_not_discarded()
Expand Down
16 changes: 15 additions & 1 deletion karp/lex/infrastructure/sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,23 @@ class ApiKeyModel(Base):
permissions = Column(JSON, nullable=False)


# Dynamic models
class IndexJob(Base):
"""
Table for schedling indexing jobs to Elasticsearch
if op == "REINDEX", entry_id and body == None
if op == "ADD", entry_id and body must be set
if op == "DELETE", entry_id must be set, body == None
"""

__tablename__ = "index_job"
id = Column(Integer, primary_key=True)
resource_id = Column(String(32), nullable=False)
op = Column(Enum("ADD", "DELETE", "REINDEX"), nullable=False)
entry_id = Column(ULIDType, nullable=True)
body = Column(JSON, nullable=True)


# Dynamic model
@functools.cache
def get_or_create_entry_history_model(
resource_id: str,
Expand Down
16 changes: 4 additions & 12 deletions karp/main/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ def bootstrap_app() -> AppContext:
"tracking.matomo.idsite": env("TRACKING_MATOMO_IDSITE", None),
"tracking.matomo.url": env("TRACKING_MATOMO_URL", None),
"tracking.matomo.token": env("TRACKING_MATOMO_TOKEN", None),
# TODO if None, just start without Elasticsearch?
"elasticsearch_host": env("ELASTICSEARCH_HOST"),
}

engine = _create_db_engine(DATABASE_URL)
es = Elasticsearch(env("ELASTICSEARCH_HOST"))

def configure_dependency_injection(binder):
binder.bind(Engine, engine)
binder.install(ElasticSearchMod(env("ELASTICSEARCH_HOST")))
binder.bind(Elasticsearch, es)
jwt_pubkey_path = env("AUTH_JWT_PUBKEY_PATH", None)
if jwt_pubkey_path is not None:
binder.bind(JWTAuthService, JWTAuthService(Path(jwt_pubkey_path)))
Expand All @@ -47,17 +50,6 @@ def configure_dependency_injection(binder):
return AppContext(injector, settings)


class ElasticSearchMod(Module):
def __init__(self, url):
self._url = url

@provider
@singleton
def es(self) -> Elasticsearch:
logger.info("Creating ES client url=%s", self._url)
return Elasticsearch(self._url)


def _create_db_engine(db_url: URL) -> Engine:
kwargs = {}
if str(db_url).startswith("sqlite"):
Expand Down
31 changes: 31 additions & 0 deletions karp/main/migrations/versions/a3cae5bf5168_add_index_jobs_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""add index jobs table
Revision ID: a3cae5bf5168
Revises: 1022e239ee16
Create Date: 2024-11-15 12:04:43.232850
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import mysql

import karp

# revision identifiers, used by Alembic.
revision = "a3cae5bf5168"
down_revision = "1022e239ee16"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"index_job",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("resource_id", sa.String(length=32), nullable=False),
sa.Column("op", sa.Enum("ADD", "DELETE", "REINDEX"), nullable=False),
sa.Column("entry_id", karp.db_infrastructure.types.ulid.ULIDType(length=26), nullable=True),
sa.Column("body", sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
11 changes: 0 additions & 11 deletions karp/search/domain/index_entry.py

This file was deleted.

Loading

0 comments on commit 180beee

Please sign in to comment.