Skip to content

Commit

Permalink
Fix connectivity for job-statistics collect (#347)
Browse files Browse the repository at this point in the history
* Statistics: Fix connectivity for `job-statistics collect`
* Statistics: Rename default table names for job statistics: `qc_` prefix
  • Loading branch information
amotl authored Jan 16, 2025
1 parent 6b1b40f commit 557083e
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Fixed connectivity for `job-statistics collect`

## 2025/01/13 v0.0.30
- Dependencies: Minimize dependencies of core installation,
Expand Down
11 changes: 4 additions & 7 deletions cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cratedb_toolkit.api.main import ClusterBase, ManagedCluster, StandaloneCluster
from cratedb_toolkit.model import DatabaseAddress, InputOutputResource, TableAddress
from cratedb_toolkit.options import cratedb_cluster_id_option, cratedb_http_option, cratedb_sqlalchemy_option
from cratedb_toolkit.util.cli import boot_click, make_command

logger = logging.getLogger(__name__)
Expand All @@ -27,13 +28,9 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):

@make_command(cli, name="table")
@click.argument("url")
@click.option(
"--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier"
)
@click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)
@click.option("--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL")
@cratedb_cluster_id_option
@cratedb_http_option
@cratedb_sqlalchemy_option
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
Expand Down
21 changes: 21 additions & 0 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]:
uri.path = ""
return uri, TableAddress(database, table)

@property
def username(self) -> t.Union[str, None]:
"""
Return the username of the database URI.
"""
return self.uri.username

@property
def password(self) -> t.Union[str, None]:
"""
Return the password of the database URI.
"""
return self.uri.password

@property
def schema(self) -> t.Union[str, None]:
"""
Return the `?schema=` query parameter of the database URI.
"""
return self.uri.query_params.get("schema")


@dataclasses.dataclass
class TableAddress:
Expand Down
11 changes: 11 additions & 0 deletions cratedb_toolkit/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import click

cratedb_cluster_id_option = click.option(
"--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier"
)
cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)
cratedb_http_option = click.option(
"--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL"
)
41 changes: 17 additions & 24 deletions cratedb_toolkit/wtf/cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import os
import sys
import typing as t
import urllib.parse

import click
from click import ClickException
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.options import cratedb_http_option, cratedb_sqlalchemy_option
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.cli import (
boot_click,
Expand Down Expand Up @@ -75,26 +75,25 @@ def help_serve():
""" # noqa: E501


cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
@cratedb_sqlalchemy_option
@cratedb_http_option
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool):
def cli(
ctx: click.Context, cratedb_sqlalchemy_url: str, cratedb_http_url: str, verbose: bool, debug: bool, scrub: bool
):
"""
Diagnostics and informational utilities.
"""
if not cratedb_sqlalchemy_url:
logger.error("Unable to operate without database address")
sys.exit(1)
ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub})
if not cratedb_sqlalchemy_url and not cratedb_http_url:
raise ClickException("Unable to operate without database address")
ctx.meta.update(
{"cratedb_http_url": cratedb_http_url, "cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}
)
return boot_click(ctx, verbose, debug)


Expand Down Expand Up @@ -149,12 +148,12 @@ def job_statistics(ctx: click.Context):
def job_statistics_collect(ctx: click.Context, once: bool):
"""
Run jobs_log collector.
# TODO: Forward `cratedb_sqlalchemy_url` properly.
"""
import cratedb_toolkit.wtf.query_collector

cratedb_toolkit.wtf.query_collector.init()
address = DatabaseAddress.from_string(ctx.meta["cratedb_http_url"] or ctx.meta["cratedb_sqlalchemy_url"])

cratedb_toolkit.wtf.query_collector.boot(address=address)
if once:
cratedb_toolkit.wtf.query_collector.record_once()
else:
Expand All @@ -166,17 +165,11 @@ def job_statistics_collect(ctx: click.Context, once: bool):
def job_statistics_view(ctx: click.Context):
"""
View job statistics about collected queries.
# TODO: Forward `cratedb_sqlalchemy_url` properly.
"""
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
url = urllib.parse.urlparse(cratedb_sqlalchemy_url)
hostname = f"{url.hostname}:{url.port or 4200}"
os.environ["HOSTNAME"] = hostname

import cratedb_toolkit.wtf.query_collector

cratedb_toolkit.wtf.query_collector.init()
address = DatabaseAddress.from_string(ctx.meta["cratedb_http_url"] or ctx.meta["cratedb_sqlalchemy_url"])
cratedb_toolkit.wtf.query_collector.boot(address=address)

response: t.Dict = {"meta": {}, "data": {}}
response["meta"]["remark"] = "WIP! This is a work in progress. The output format will change."
Expand Down
47 changes: 28 additions & 19 deletions cratedb_toolkit/wtf/query_collector.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Copyright (c) 2021-2025, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.

# ruff: noqa: S608
import json
import logging
import os
import time
import typing as t
from uuid import uuid4

import urllib3
Expand All @@ -15,17 +16,9 @@

logger = logging.getLogger(__name__)

cratedb_sqlalchemy_url = os.getenv("CRATEDB_SQLALCHEMY_URL", "crate://crate@localhost:4200")
address = DatabaseAddress.from_string(cratedb_sqlalchemy_url)
host = f"{address.uri.host}:{address.uri.port}"
username = address.uri.username
password = address.uri.password
_, table_address = address.decode()
schema = table_address.schema or "stats"

interval = float(os.getenv("INTERVAL", 10))
stmt_log_table = os.getenv("STMT_TABLE", f"{schema}.statement_log")
last_exec_table = os.getenv("LAST_EXEC_TABLE", f"{schema}.last_execution")
TRACING = False


last_execution_ts = 0
sys_jobs_log = {}
bucket_list = [10, 50, 100, 500, 1000, 2000, 5000, 10000, 15000, 20000]
Expand All @@ -43,16 +36,32 @@
"INF": 0,
}

stmt_log_table: str
last_exec_table: str
cursor: t.Any
last_scrape: int
interval: float

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
conn = client.connect(host, username=username, password=password)
cursor = conn.cursor()
last_scrape = int(time.time() * 1000) - (interval * 60000)

TRACING = False
def boot(address: DatabaseAddress):
# TODO: Refactor to non-global variables.
global stmt_log_table, last_exec_table, cursor, last_scrape, interval
schema = address.schema or "stats"

interval = float(os.getenv("INTERVAL", 10))
stmt_log_table = os.getenv("STMT_TABLE", f'"{schema}".qc_statement_log')
last_exec_table = os.getenv("LAST_EXEC_TABLE", f'"{schema}".qc_last_execution')

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger.info(f"Connecting to {address.httpuri}")
conn = client.connect(address.httpuri, username=address.username, password=address.password, schema=schema)
cursor = conn.cursor()
last_scrape = int(time.time() * 1000) - int(interval * 60000)

dbinit()


def init():
def dbinit():
stmt = (
f"CREATE TABLE IF NOT EXISTS {stmt_log_table} "
f"(id TEXT, stmt TEXT, calls INT, bucket OBJECT, last_used TIMESTAMP, "
Expand Down Expand Up @@ -249,7 +258,7 @@ def record_forever():


def main():
init()
boot(address=DatabaseAddress.from_string("http://crate@localhost:4200"))
record_forever()


Expand Down
7 changes: 5 additions & 2 deletions doc/wtf/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ It is still a work-in-progress, but it is usable already.
```shell
pip install --upgrade 'cratedb-toolkit'
```
Alternatively, use the Docker image at `ghcr.io/crate/cratedb-toolkit`.
Alternatively, use the Docker image per `ghcr.io/crate/cratedb-toolkit`.

## Synopsis

Define CrateDB database cluster address.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://localhost/
export CRATEDB_SQLALCHEMY_URL=crate://username:password@localhost:4200/?schema=ext&ssl=true
```
```shell
export CRATEDB_HTTP_URL=https://username:password@localhost:4200/?schema=ext
```


Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
'"doc"."jobinfo"',
'"ext"."clusterinfo"',
'"ext"."jobinfo"',
'"stats"."statement_log"',
'"stats"."last_execution"',
f'"{TESTDRIVE_EXT_SCHEMA}"."qc_statement_log"',
f'"{TESTDRIVE_EXT_SCHEMA}"."qc_last_execution"',
f'"{TESTDRIVE_EXT_SCHEMA}"."retention_policy"',
f'"{TESTDRIVE_DATA_SCHEMA}"."raw_metrics"',
f'"{TESTDRIVE_DATA_SCHEMA}"."sensor_readings"',
Expand Down
24 changes: 15 additions & 9 deletions tests/wtf/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from click.testing import CliRunner

from cratedb_toolkit.wtf.cli import cli
from tests.conftest import TESTDRIVE_EXT_SCHEMA


def test_wtf_cli_info(cratedb):
Expand Down Expand Up @@ -93,8 +94,11 @@ def test_wtf_cli_statistics_collect(cratedb, caplog):
Verify `cratedb-wtf job-statistics collect`.
"""

# Configure database URI.
dburi = cratedb.database.dburi + f"?schema={TESTDRIVE_EXT_SCHEMA}"

# Invoke command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi})
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": dburi})
result = runner.invoke(
cli,
args="job-statistics collect --once",
Expand All @@ -108,24 +112,26 @@ def test_wtf_cli_statistics_collect(cratedb, caplog):
# Verify outcome: Database content.
# stats.statement_log, stats.last_execution
results = cratedb.database.run_sql("SHOW TABLES", records=True)
assert {"table_name": "last_execution"} in results
assert {"table_name": "statement_log"} in results
assert {"table_name": "qc_last_execution"} in results
assert {"table_name": "qc_statement_log"} in results

# FIXME: Table is empty. Why?
cratedb.database.run_sql('REFRESH TABLE "stats"."statement_log"')
assert cratedb.database.count_records("stats.statement_log") == 0
cratedb.database.refresh_table(f"{TESTDRIVE_EXT_SCHEMA}.qc_statement_log")
assert cratedb.database.count_records(f"{TESTDRIVE_EXT_SCHEMA}.qc_statement_log") >= 19

cratedb.database.run_sql('REFRESH TABLE "stats"."last_execution"')
assert cratedb.database.count_records("stats.last_execution") == 1
cratedb.database.refresh_table(f"{TESTDRIVE_EXT_SCHEMA}.qc_last_execution")
assert cratedb.database.count_records(f"{TESTDRIVE_EXT_SCHEMA}.qc_last_execution") == 1


def test_wtf_cli_statistics_view(cratedb):
"""
Verify `cratedb-wtf job-statistics view`.
"""

# Configure database URI.
dburi = cratedb.database.dburi + f"?schema={TESTDRIVE_EXT_SCHEMA}"

# Invoke command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi})
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": dburi})
result = runner.invoke(
cli,
args="job-statistics view",
Expand Down

0 comments on commit 557083e

Please sign in to comment.