From 557083ed381c5b478c716f1db984750a89c8cca2 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 16 Jan 2025 03:01:21 +0100 Subject: [PATCH] Fix connectivity for `job-statistics collect` (#347) * Statistics: Fix connectivity for `job-statistics collect` * Statistics: Rename default table names for job statistics: `qc_` prefix --- CHANGES.md | 1 + cratedb_toolkit/io/cli.py | 11 +++--- cratedb_toolkit/model.py | 21 ++++++++++++ cratedb_toolkit/options.py | 11 ++++++ cratedb_toolkit/wtf/cli.py | 41 ++++++++++------------ cratedb_toolkit/wtf/query_collector.py | 47 +++++++++++++++----------- doc/wtf/index.md | 7 ++-- tests/conftest.py | 4 +-- tests/wtf/test_cli.py | 24 ++++++++----- 9 files changed, 104 insertions(+), 63 deletions(-) create mode 100644 cratedb_toolkit/options.py diff --git a/CHANGES.md b/CHANGES.md index 5fd34485..9dd8cadc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Fixed connectivity for `job-statistics collect` ## 2025/01/13 v0.0.30 - Dependencies: Minimize dependencies of core installation, diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 8ebb7729..7c1eb9e7 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -8,6 +8,7 @@ from cratedb_toolkit.api.main import ClusterBase, ManagedCluster, StandaloneCluster from cratedb_toolkit.model import DatabaseAddress, InputOutputResource, TableAddress +from cratedb_toolkit.options import cratedb_cluster_id_option, cratedb_http_option, cratedb_sqlalchemy_option from cratedb_toolkit.util.cli import boot_click, make_command logger = logging.getLogger(__name__) @@ -27,13 +28,9 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): @make_command(cli, name="table") @click.argument("url") -@click.option( - "--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier" -) -@click.option( - "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" -) -@click.option("--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL") +@cratedb_cluster_id_option +@cratedb_http_option +@cratedb_sqlalchemy_option @click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data") @click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") @click.option("--format", "format_", type=str, required=False, help="File format of the import resource") diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 1fe3a91a..28cf5ba1 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -77,6 +77,27 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]: uri.path = "" return uri, TableAddress(database, table) + @property + def username(self) -> t.Union[str, None]: + """ + Return the username of the database URI. + """ + return self.uri.username + + @property + def password(self) -> t.Union[str, None]: + """ + Return the password of the database URI. + """ + return self.uri.password + + @property + def schema(self) -> t.Union[str, None]: + """ + Return the `?schema=` query parameter of the database URI. + """ + return self.uri.query_params.get("schema") + @dataclasses.dataclass class TableAddress: diff --git a/cratedb_toolkit/options.py b/cratedb_toolkit/options.py new file mode 100644 index 00000000..ff68dbb7 --- /dev/null +++ b/cratedb_toolkit/options.py @@ -0,0 +1,11 @@ +import click + +cratedb_cluster_id_option = click.option( + "--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier" +) +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) +cratedb_http_option = click.option( + "--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL" +) diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py index 54806432..6d6e0a68 100644 --- a/cratedb_toolkit/wtf/cli.py +++ b/cratedb_toolkit/wtf/cli.py @@ -1,14 +1,14 @@ # Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging -import os -import sys import typing as t -import urllib.parse import click +from click import ClickException from click_aliases import ClickAliasedGroup +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.options import cratedb_http_option, cratedb_sqlalchemy_option from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.cli import ( boot_click, @@ -75,26 +75,25 @@ def help_serve(): """ # noqa: E501 -cratedb_sqlalchemy_option = click.option( - "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" -) - - @click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] @cratedb_sqlalchemy_option +@cratedb_http_option @click.option("--verbose", is_flag=True, required=False, help="Turn on logging") @click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") @click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") @click.version_option() @click.pass_context -def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): +def cli( + ctx: click.Context, cratedb_sqlalchemy_url: str, cratedb_http_url: str, verbose: bool, debug: bool, scrub: bool +): """ Diagnostics and informational utilities. """ - if not cratedb_sqlalchemy_url: - logger.error("Unable to operate without database address") - sys.exit(1) - ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + if not cratedb_sqlalchemy_url and not cratedb_http_url: + raise ClickException("Unable to operate without database address") + ctx.meta.update( + {"cratedb_http_url": cratedb_http_url, "cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub} + ) return boot_click(ctx, verbose, debug) @@ -149,12 +148,12 @@ def job_statistics(ctx: click.Context): def job_statistics_collect(ctx: click.Context, once: bool): """ Run jobs_log collector. - - # TODO: Forward `cratedb_sqlalchemy_url` properly. """ import cratedb_toolkit.wtf.query_collector - cratedb_toolkit.wtf.query_collector.init() + address = DatabaseAddress.from_string(ctx.meta["cratedb_http_url"] or ctx.meta["cratedb_sqlalchemy_url"]) + + cratedb_toolkit.wtf.query_collector.boot(address=address) if once: cratedb_toolkit.wtf.query_collector.record_once() else: @@ -166,17 +165,11 @@ def job_statistics_collect(ctx: click.Context, once: bool): def job_statistics_view(ctx: click.Context): """ View job statistics about collected queries. - - # TODO: Forward `cratedb_sqlalchemy_url` properly. """ - cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] - url = urllib.parse.urlparse(cratedb_sqlalchemy_url) - hostname = f"{url.hostname}:{url.port or 4200}" - os.environ["HOSTNAME"] = hostname - import cratedb_toolkit.wtf.query_collector - cratedb_toolkit.wtf.query_collector.init() + address = DatabaseAddress.from_string(ctx.meta["cratedb_http_url"] or ctx.meta["cratedb_sqlalchemy_url"]) + cratedb_toolkit.wtf.query_collector.boot(address=address) response: t.Dict = {"meta": {}, "data": {}} response["meta"]["remark"] = "WIP! This is a work in progress. The output format will change." diff --git a/cratedb_toolkit/wtf/query_collector.py b/cratedb_toolkit/wtf/query_collector.py index 7d6f55bd..672a3e75 100644 --- a/cratedb_toolkit/wtf/query_collector.py +++ b/cratedb_toolkit/wtf/query_collector.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, Crate.io Inc. +# Copyright (c) 2021-2025, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. # ruff: noqa: S608 @@ -6,6 +6,7 @@ import logging import os import time +import typing as t from uuid import uuid4 import urllib3 @@ -15,17 +16,9 @@ logger = logging.getLogger(__name__) -cratedb_sqlalchemy_url = os.getenv("CRATEDB_SQLALCHEMY_URL", "crate://crate@localhost:4200") -address = DatabaseAddress.from_string(cratedb_sqlalchemy_url) -host = f"{address.uri.host}:{address.uri.port}" -username = address.uri.username -password = address.uri.password -_, table_address = address.decode() -schema = table_address.schema or "stats" - -interval = float(os.getenv("INTERVAL", 10)) -stmt_log_table = os.getenv("STMT_TABLE", f"{schema}.statement_log") -last_exec_table = os.getenv("LAST_EXEC_TABLE", f"{schema}.last_execution") +TRACING = False + + last_execution_ts = 0 sys_jobs_log = {} bucket_list = [10, 50, 100, 500, 1000, 2000, 5000, 10000, 15000, 20000] @@ -43,16 +36,32 @@ "INF": 0, } +stmt_log_table: str +last_exec_table: str +cursor: t.Any +last_scrape: int +interval: float -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -conn = client.connect(host, username=username, password=password) -cursor = conn.cursor() -last_scrape = int(time.time() * 1000) - (interval * 60000) -TRACING = False +def boot(address: DatabaseAddress): + # TODO: Refactor to non-global variables. + global stmt_log_table, last_exec_table, cursor, last_scrape, interval + schema = address.schema or "stats" + + interval = float(os.getenv("INTERVAL", 10)) + stmt_log_table = os.getenv("STMT_TABLE", f'"{schema}".qc_statement_log') + last_exec_table = os.getenv("LAST_EXEC_TABLE", f'"{schema}".qc_last_execution') + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + logger.info(f"Connecting to {address.httpuri}") + conn = client.connect(address.httpuri, username=address.username, password=address.password, schema=schema) + cursor = conn.cursor() + last_scrape = int(time.time() * 1000) - int(interval * 60000) + + dbinit() -def init(): +def dbinit(): stmt = ( f"CREATE TABLE IF NOT EXISTS {stmt_log_table} " f"(id TEXT, stmt TEXT, calls INT, bucket OBJECT, last_used TIMESTAMP, " @@ -249,7 +258,7 @@ def record_forever(): def main(): - init() + boot(address=DatabaseAddress.from_string("http://crate@localhost:4200")) record_forever() diff --git a/doc/wtf/index.md b/doc/wtf/index.md index d94f93dc..74f06409 100644 --- a/doc/wtf/index.md +++ b/doc/wtf/index.md @@ -8,13 +8,16 @@ It is still a work-in-progress, but it is usable already. ```shell pip install --upgrade 'cratedb-toolkit' ``` -Alternatively, use the Docker image at `ghcr.io/crate/cratedb-toolkit`. +Alternatively, use the Docker image per `ghcr.io/crate/cratedb-toolkit`. ## Synopsis Define CrateDB database cluster address. ```shell -export CRATEDB_SQLALCHEMY_URL=crate://localhost/ +export CRATEDB_SQLALCHEMY_URL=crate://username:password@localhost:4200/?schema=ext&ssl=true +``` +```shell +export CRATEDB_HTTP_URL=https://username:password@localhost:4200/?schema=ext ``` diff --git a/tests/conftest.py b/tests/conftest.py index 73787d3b..787a9a9a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,8 +21,8 @@ '"doc"."jobinfo"', '"ext"."clusterinfo"', '"ext"."jobinfo"', - '"stats"."statement_log"', - '"stats"."last_execution"', + f'"{TESTDRIVE_EXT_SCHEMA}"."qc_statement_log"', + f'"{TESTDRIVE_EXT_SCHEMA}"."qc_last_execution"', f'"{TESTDRIVE_EXT_SCHEMA}"."retention_policy"', f'"{TESTDRIVE_DATA_SCHEMA}"."raw_metrics"', f'"{TESTDRIVE_DATA_SCHEMA}"."sensor_readings"', diff --git a/tests/wtf/test_cli.py b/tests/wtf/test_cli.py index aef787ad..910656e2 100644 --- a/tests/wtf/test_cli.py +++ b/tests/wtf/test_cli.py @@ -4,6 +4,7 @@ from click.testing import CliRunner from cratedb_toolkit.wtf.cli import cli +from tests.conftest import TESTDRIVE_EXT_SCHEMA def test_wtf_cli_info(cratedb): @@ -93,8 +94,11 @@ def test_wtf_cli_statistics_collect(cratedb, caplog): Verify `cratedb-wtf job-statistics collect`. """ + # Configure database URI. + dburi = cratedb.database.dburi + f"?schema={TESTDRIVE_EXT_SCHEMA}" + # Invoke command. - runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": dburi}) result = runner.invoke( cli, args="job-statistics collect --once", @@ -108,15 +112,14 @@ def test_wtf_cli_statistics_collect(cratedb, caplog): # Verify outcome: Database content. # stats.statement_log, stats.last_execution results = cratedb.database.run_sql("SHOW TABLES", records=True) - assert {"table_name": "last_execution"} in results - assert {"table_name": "statement_log"} in results + assert {"table_name": "qc_last_execution"} in results + assert {"table_name": "qc_statement_log"} in results - # FIXME: Table is empty. Why? - cratedb.database.run_sql('REFRESH TABLE "stats"."statement_log"') - assert cratedb.database.count_records("stats.statement_log") == 0 + cratedb.database.refresh_table(f"{TESTDRIVE_EXT_SCHEMA}.qc_statement_log") + assert cratedb.database.count_records(f"{TESTDRIVE_EXT_SCHEMA}.qc_statement_log") >= 19 - cratedb.database.run_sql('REFRESH TABLE "stats"."last_execution"') - assert cratedb.database.count_records("stats.last_execution") == 1 + cratedb.database.refresh_table(f"{TESTDRIVE_EXT_SCHEMA}.qc_last_execution") + assert cratedb.database.count_records(f"{TESTDRIVE_EXT_SCHEMA}.qc_last_execution") == 1 def test_wtf_cli_statistics_view(cratedb): @@ -124,8 +127,11 @@ def test_wtf_cli_statistics_view(cratedb): Verify `cratedb-wtf job-statistics view`. """ + # Configure database URI. + dburi = cratedb.database.dburi + f"?schema={TESTDRIVE_EXT_SCHEMA}" + # Invoke command. - runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi}) + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": dburi}) result = runner.invoke( cli, args="job-statistics view",