Skip to content

Commit

Permalink
give reason in error if dataset/split cache is refreshing (#193)
Browse files Browse the repository at this point in the history
* feat: 🎸 give reason in error if dataset/split cache is refreshi

fixes #186

* style: 💄 fix style
  • Loading branch information
severo authored Apr 4, 2022
1 parent a6a8661 commit 4a9bf7a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/datasets_preview_backend/io/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def get_splits_response(dataset_name: str) -> Tuple[Union[SplitsResponse, None],
try:
dataset = DbDataset.objects(dataset_name=dataset_name).get()
except DoesNotExist as e:
raise Status400Error("Not found. Maybe the cache is missing, or maybe the dataset does not exist.") from e
raise Status400Error("Not found. The dataset does not exist.") from e

# ^ can also raise MultipleObjectsReturned, which should not occur -> we let the exception raise

Expand Down Expand Up @@ -583,7 +583,7 @@ def get_rows_response(
try:
split = DbSplit.objects(dataset_name=dataset_name, config_name=config_name, split_name=split_name).get()
except DoesNotExist as e:
raise Status400Error("Not found. Maybe the cache is missing, or maybe the split does not exist.", e) from e
raise Status400Error("Not found. The split does not exist.", e) from e

# ^ can also raise MultipleObjectsReturned, which should not occur -> we let the exception raise

Expand Down
16 changes: 16 additions & 0 deletions src/datasets_preview_backend/io/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,19 @@ def get_dataset_dump_by_status(waiting_started: bool = False) -> DumpByStatus:

def get_split_dump_by_status(waiting_started: bool = False) -> DumpByStatus:
return get_dump_by_status(SplitJob.objects, waiting_started)


def is_dataset_in_queue(dataset_name: str) -> bool:
return DatasetJob.objects(status__in=[Status.WAITING, Status.STARTED], dataset_name=dataset_name).count() > 0


def is_split_in_queue(dataset_name: str, config_name: str, split_name: str) -> bool:
return (
SplitJob.objects(
status__in=[Status.WAITING, Status.STARTED],
dataset_name=dataset_name,
config_name=config_name,
split_name=split_name,
).count()
> 0
)
27 changes: 20 additions & 7 deletions src/datasets_preview_backend/routes/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
ROWS_MAX_BYTES,
ROWS_MIN_NUMBER,
)
from datasets_preview_backend.exceptions import StatusError
from datasets_preview_backend.exceptions import Status400Error, StatusError
from datasets_preview_backend.io.cache import get_rows_response
from datasets_preview_backend.io.queue import is_split_in_queue
from datasets_preview_backend.routes._utils import get_response

logger = logging.getLogger(__name__)
Expand All @@ -22,11 +23,23 @@ async def rows_endpoint(request: Request) -> Response:
logger.info(f"/rows, dataset={dataset_name}, config={config_name}, split={split_name}")

try:
if not isinstance(dataset_name, str) or not isinstance(config_name, str) or not isinstance(split_name, str):
raise StatusError("Parameters 'dataset', 'config' and 'split' are required", 400)
rows_response, rows_error, status_code = get_rows_response(
dataset_name, config_name, split_name, ROWS_MAX_BYTES, ROWS_MIN_NUMBER
)
return get_response(rows_response or rows_error, status_code, MAX_AGE_LONG_SECONDS)
try:
if (
not isinstance(dataset_name, str)
or not isinstance(config_name, str)
or not isinstance(split_name, str)
):
raise StatusError("Parameters 'dataset', 'config' and 'split' are required", 400)
rows_response, rows_error, status_code = get_rows_response(
dataset_name, config_name, split_name, ROWS_MAX_BYTES, ROWS_MIN_NUMBER
)
return get_response(rows_response or rows_error, status_code, MAX_AGE_LONG_SECONDS)
except StatusError as err:
if err.message == "Not found. The split does not exist." and is_split_in_queue(
dataset_name, config_name, split_name
):
raise Status400Error("The split is being processed. Retry later.", err) from err
else:
raise err
except StatusError as err:
return get_response(err.as_content(), err.status_code, MAX_AGE_LONG_SECONDS)
15 changes: 11 additions & 4 deletions src/datasets_preview_backend/routes/splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datasets_preview_backend.config import MAX_AGE_LONG_SECONDS
from datasets_preview_backend.exceptions import Status400Error, StatusError
from datasets_preview_backend.io.cache import get_splits_response
from datasets_preview_backend.io.queue import is_dataset_in_queue
from datasets_preview_backend.routes._utils import get_response

logger = logging.getLogger(__name__)
Expand All @@ -16,9 +17,15 @@ async def splits_endpoint(request: Request) -> Response:
logger.info(f"/splits, dataset={dataset_name}")

try:
if not isinstance(dataset_name, str):
raise Status400Error("Parameter 'dataset' is required")
splits_response, splits_error, status_code = get_splits_response(dataset_name)
return get_response(splits_response or splits_error, status_code, MAX_AGE_LONG_SECONDS)
try:
if not isinstance(dataset_name, str):
raise Status400Error("Parameter 'dataset' is required")
splits_response, splits_error, status_code = get_splits_response(dataset_name)
return get_response(splits_response or splits_error, status_code, MAX_AGE_LONG_SECONDS)
except StatusError as err:
if err.message == "Not found. The dataset does not exist." and is_dataset_in_queue(dataset_name):
raise Status400Error("The dataset is being processed. Retry later.", err) from err
else:
raise err
except StatusError as err:
return get_response(err.as_content(), err.status_code, MAX_AGE_LONG_SECONDS)
32 changes: 27 additions & 5 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@
from starlette.testclient import TestClient

from datasets_preview_backend.app import create_app
from datasets_preview_backend.config import MONGO_CACHE_DATABASE
from datasets_preview_backend.config import MONGO_CACHE_DATABASE, MONGO_QUEUE_DATABASE
from datasets_preview_backend.exceptions import Status400Error
from datasets_preview_backend.io.cache import clean_database as clean_cache_database
from datasets_preview_backend.io.cache import (
clean_database,
refresh_dataset_split_full_names,
refresh_split,
)
from datasets_preview_backend.io.queue import add_dataset_job, add_split_job
from datasets_preview_backend.io.queue import clean_database as clean_queue_database


@pytest.fixture(autouse=True, scope="module")
def safe_guard() -> None:
if "test" not in MONGO_CACHE_DATABASE:
raise Exception("Test must be launched on a test mongo database")
raise Exception("Tests on cache must be launched on a test mongo database")
if "test" not in MONGO_QUEUE_DATABASE:
raise Exception("Tests on queue must be launched on a test mongo database")


@pytest.fixture(scope="module")
Expand All @@ -23,8 +27,9 @@ def client() -> TestClient:


@pytest.fixture(autouse=True)
def clean_mongo_database() -> None:
clean_database()
def clean_mongo_databases() -> None:
clean_cache_database()
clean_queue_database()


def test_get_cache_reports(client: TestClient) -> None:
Expand Down Expand Up @@ -199,3 +204,20 @@ def test_bytes_limit(client: TestClient) -> None:
json = response.json()
rowItems = json["rows"]
assert len(rowItems) == 3


def test_cache_refreshing(client: TestClient) -> None:
dataset = "acronym_identification"
response = client.get("/splits", params={"dataset": dataset})
assert response.json()["message"] == "Not found. The dataset does not exist."
add_dataset_job(dataset)
response = client.get("/splits", params={"dataset": dataset})
assert response.json()["message"] == "The dataset is being processed. Retry later."

config = "default"
split = "train"
response = client.get("/rows", params={"dataset": dataset, "config": config, "split": split})
assert response.json()["message"] == "Not found. The split does not exist."
add_split_job(dataset, config, split)
response = client.get("/rows", params={"dataset": dataset, "config": config, "split": split})
assert response.json()["message"] == "The split is being processed. Retry later."

0 comments on commit 4a9bf7a

Please sign in to comment.