From fe237b7de7f3fb75e0f51ba0ed2262761798d2f7 Mon Sep 17 00:00:00 2001 From: Ali Kelkawi Date: Fri, 21 Jun 2024 18:25:03 +0300 Subject: [PATCH] refactor charm to use ROCK image --- .gitignore | 1 + CONTRIBUTING.md | 15 +- README.md | 57 +--- config.yaml | 30 -- metadata.yaml | 6 - .../resource_sample/workflows/workflow2.py | 29 -- .../Makefile | 18 +- .../pyproject.toml | 1 - .../resource_sample/activities/__init__.py | 0 .../resource_sample/activities/activity1.py | 2 +- .../resource_sample/activities/activity2.py | 2 +- .../resource_sample/common/__init__.py | 0 .../resource_sample/common/messages.py | 0 resource_sample_py/resource_sample/worker.py | 120 ++++++++ .../resource_sample/workflows/__init__.py | 0 .../resource_sample/workflows/workflow1.py | 18 +- resource_sample_py/rockcraft.yaml | 71 +++++ resource_sample_py/scripts/start-worker.sh | 4 + {resource_sample => sample_files}/config.yaml | 3 - {resource_sample => sample_files}/invalid.env | 0 {resource_sample => sample_files}/sample.env | 0 src/charm.py | 266 +----------------- src/literals.py | 2 +- src/resources/check_status.py | 33 --- src/resources/temporal_client/__init__.py | 5 - src/resources/temporal_client/activities.py | 36 --- src/resources/temporal_client/workflows.py | 34 --- src/resources/worker-dependencies.txt | 2 - src/resources/worker.py | 188 ------------- tests/integration/conftest.py | 11 +- tests/integration/helpers.py | 28 +- tests/integration/test_charm.py | 4 +- tests/integration/test_upgrades.py | 3 + tests/unit/literals.py | 16 +- tests/unit/test_charm.py | 118 +------- tox.ini | 21 +- 36 files changed, 299 insertions(+), 845 deletions(-) delete mode 100644 resource_sample/resource_sample/workflows/workflow2.py rename {resource_sample => resource_sample_py}/Makefile (85%) rename {resource_sample => resource_sample_py}/pyproject.toml (98%) create mode 100644 resource_sample_py/resource_sample/activities/__init__.py rename {resource_sample => resource_sample_py}/resource_sample/activities/activity1.py (86%) rename {resource_sample => resource_sample_py}/resource_sample/activities/activity2.py (94%) create mode 100644 resource_sample_py/resource_sample/common/__init__.py rename {resource_sample => resource_sample_py}/resource_sample/common/messages.py (100%) create mode 100644 resource_sample_py/resource_sample/worker.py create mode 100644 resource_sample_py/resource_sample/workflows/__init__.py rename {resource_sample => resource_sample_py}/resource_sample/workflows/workflow1.py (62%) create mode 100644 resource_sample_py/rockcraft.yaml create mode 100755 resource_sample_py/scripts/start-worker.sh rename {resource_sample => sample_files}/config.yaml (58%) rename {resource_sample => sample_files}/invalid.env (100%) rename {resource_sample => sample_files}/sample.env (100%) delete mode 100644 src/resources/check_status.py delete mode 100644 src/resources/temporal_client/__init__.py delete mode 100644 src/resources/temporal_client/activities.py delete mode 100644 src/resources/temporal_client/workflows.py delete mode 100644 src/resources/worker-dependencies.txt delete mode 100644 src/resources/worker.py diff --git a/.gitignore b/.gitignore index d1cb8f6..ef0a8a4 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ __pycache__/ .vscode/ *.whl *.tar.gz +*.rock diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 165d6f5..d815176 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -45,7 +45,7 @@ deployment, follow the following steps: newgrp microk8s # Enable the necessary Microk8s addons: - microk8s enable hostpath-storage dns + microk8s enable hostpath-storage dns registry # Install the Juju CLI client, juju: sudo snap install juju --classic @@ -60,20 +60,17 @@ deployment, follow the following steps: juju model-config logging-config="=INFO;unit=DEBUG" # Pack the charm: - charmcraft pack [--destructive-mode] + charmcraft pack - # Build wheel file: - cd resource_sample && poetry build -f wheel + # Build ROCK file and push it to local registry: + cd resource_sample_py && make build_rock # Deploy the charm: - juju deploy ./temporal-worker-k8s_ubuntu-22.04-amd64.charm --resource temporal-worker-image=python:3.8.2-slim-buster + juju deploy ./temporal-worker-k8s_ubuntu-22.04-amd64.charm --resource temporal-worker-image=localhost:32000/temporal-worker-rock juju config temporal-worker-k8s --file=path/to/config.yaml - # Attach wheel file resource: - juju attach-resource temporal-worker-k8s workflows-file=./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl - # Check progress: - juju status --relations --watch 1s + juju status --relations --watch 2s juju debug-log # Clean-up before retrying: diff --git a/README.md b/README.md index 578066d..67e8238 100644 --- a/README.md +++ b/README.md @@ -20,11 +20,22 @@ connect to a deployed Temporal server. ### Deploying -The Temporal worker operator can be deployed and connected to a deployed +To deploy the Temporal worker operator, you can start by creating a Temporal +workflow, or use the one provided in +[`resource_sample_py`](./resource_sample_py/). Once done, the project can be +built as a [ROCK](https://documentation.ubuntu.com/rockcraft/en/stable/) and +pushed to the [local registry](https://microk8s.io/docs/registry-built-in) by +running the following command inside the `resource_sample_py` directory: + +```bash +make build_rock +``` + +The Temporal worker operator can then be deployed and connected to a deployed Temporal server using the Juju command line as follows: ```bash -juju deploy temporal-worker-k8s +juju deploy temporal-worker-k8s --resource temporal-worker-image=localhost:32000/temporal-worker-rock juju config temporal-worker-k8s --file=path/to/config.yaml ``` @@ -35,38 +46,6 @@ temporal-worker-k8s: host: "localhost:7233" # Replace with Temporal server hostname queue: "test-queue" namespace: "test" - workflows-file-name: "python_samples-1.1.0-py3-none-any.whl" - # To support all defined workflows and activities, use the 'all' keyword - supported-workflows: "all" - supported-activities: "all" -``` - -### Attaching "workflows-file" resource - -The Temporal worker operator expects a "workflows-file" resource to be attached -after deployment, which contains a set of defined Temporal workflows and -activities as defined in the [resource_sample](./resource_sample/) directory. -The structure of the built wheel file must follow the same structure: - -``` -- workflows/ - - workflow-a.py - - workflow-b.py -- activities/ - - activity-a.py - - activity-b.py -- some_other_directory/ -- some_helper_file.py -``` - -The sample wheel file can be built by running `poetry build -f wheel` in the -[resource_sample](./resource_sample/) directory. - -Once ready, the resource can be attached as follows: - -```bash -make -C resource_sample/ build -juju attach-resource temporal-worker-k8s workflows-file=./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl ``` Once done, the charm should enter an active state, indicating that the worker is @@ -77,15 +56,9 @@ pod to ensure there are no errors with the workload container: kubectl -n logs temporal-worker-k8s-0 -c temporal-worker -f ``` -Note: Files defined under the "workflows" directory must only contain classes -decorated using the `@workflow.defn` decorator. Files defined under the -"activities" directory must only contain methods decorated using the -`@activity.defn` decorator. Any additional methods or classes needed should be -defined in other files. - ## Verifying -To verify that the setup is running correctly, run `juju status --watch 1s` and +To verify that the setup is running correctly, run `juju status --watch 2s` and ensure the pod is active. To run a basic workflow, you may use a simple client (e.g. @@ -165,7 +138,7 @@ instructions found [here](https://charmhub.io/vault-k8s/docs/h-getting-started). For a reference on how to access credentials from Vault through the workflow code, -[`activity2.py`](./resource_sample/resource_sample/activities/activity2.py) +[`activity2.py`](./resource_sample_py/resource_sample/activities/activity2.py) under the `resource_sample` directory shows a sample for writing and reading secrets in Vault. diff --git a/config.yaml b/config.yaml index e5e82ea..5e1663f 100644 --- a/config.yaml +++ b/config.yaml @@ -31,16 +31,6 @@ options: default: "" type: string - supported-workflows: - description: Comma-separated list of workflow names to extract from attached wheel file. - default: "" - type: string - - supported-activities: - description: Comma-separated list of workflow activities to extract from attached wheel file. - default: "" - type: string - sentry-dsn: description: Sentry Data Source Name to send events to. default: "" @@ -69,11 +59,6 @@ options: default: 1.0 type: float - workflows-file-name: - description: Name of the wheel file resource attached to the charm. - default: "" - type: string - encryption-key: description: Base64-encoded key used for data encryption. default: "" @@ -161,18 +146,3 @@ options: description: Client certificate URL for OIDC authentication. default: "" type: string - - http-proxy: - description: Used to set HTTP_PROXY environment variable. - default: "" - type: string - - https-proxy: - description: Used to set HTTPS_PROXY environment variable. - default: "" - type: string - - no-proxy: - description: Used to set NO_PROXY environment variable. - default: "" - type: string diff --git a/metadata.yaml b/metadata.yaml index ee950bd..3dab5f9 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -35,17 +35,11 @@ peers: containers: temporal-worker: resource: temporal-worker-image - # Included for simplicity in integration tests. - upstream-source: python:3.8.2-slim-buster resources: temporal-worker-image: type: oci-image description: OCI image containing Python package. - workflows-file: - type: file - description: Wheel file containing Temporal workflows and activities. - filename: 'workflows-file.whl' env-file: type: file description: .env file containing environment variables to be sourced to the workload container. diff --git a/resource_sample/resource_sample/workflows/workflow2.py b/resource_sample/resource_sample/workflows/workflow2.py deleted file mode 100644 index 616275d..0000000 --- a/resource_sample/resource_sample/workflows/workflow2.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -from datetime import timedelta - -from temporalio import workflow -from temporalio.common import RetryPolicy -from temporalio.exceptions import FailureError - -from pathlib import Path -import sys -path_root = Path(__file__).parents[2] -sys.path.append(str(path_root)) - -with workflow.unsafe.imports_passed_through(): - import resource_sample.activities.activity2 as all_activities -from datetime import timedelta - -# Basic workflow that logs and invokes an activity -@workflow.defn(name="VaultWorkflow") -class VaultWorkflow: - @workflow.run - async def run(self, name: str) -> str: - workflow.logger.info("Running workflow with parameter %s" % name) - return await workflow.execute_activity( - all_activities.vault_test, - all_activities.ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=10), - ) diff --git a/resource_sample/Makefile b/resource_sample_py/Makefile similarity index 85% rename from resource_sample/Makefile rename to resource_sample_py/Makefile index e42b5d4..282be58 100644 --- a/resource_sample/Makefile +++ b/resource_sample_py/Makefile @@ -3,13 +3,17 @@ # Makefile to help automate tasks +# The name of the python package/project +PY_PACKAGE := resource_sample + +# ROCK build parameters +ROCK_NAME := temporal-worker_1.0_amd64.rock +IMAGE_NAME := temporal-worker-rock:latest + # build and dist folders BUILD := build DIST := dist -# The name of the python package/project -PY_PACKAGE := resource_sample - # Paths to venv executables POETRY := poetry PY := python3 @@ -71,6 +75,14 @@ changelog: ## Add a new entry to the Changelog and bump the package version build: ## Create a Python source distribution and a wheel in dist $(POETRY) build +.PHONY: build_rock +build_rock: + rockcraft pack + rockcraft.skopeo --insecure-policy copy oci-archive:$(ROCK_NAME) docker-daemon:$(IMAGE_NAME) + IMAGE_ID=$$(docker inspect --format='{{.Id}}' $(IMAGE_NAME)); \ + docker tag $$IMAGE_ID localhost:32000/$(IMAGE_NAME) + docker push localhost:32000/$(IMAGE_NAME) + .PHONY: publish publish: ## Publish the package to PYPI $(POETRY) publish diff --git a/resource_sample/pyproject.toml b/resource_sample_py/pyproject.toml similarity index 98% rename from resource_sample/pyproject.toml rename to resource_sample_py/pyproject.toml index 883b54f..d24d7d1 100644 --- a/resource_sample/pyproject.toml +++ b/resource_sample_py/pyproject.toml @@ -11,7 +11,6 @@ packages = [ ] [tool.poetry.dependencies] -python = "^3.8" protobuf = "^3.2.0" PyYAML = "^6.0" temporal-lib-py = "^1.3.1" diff --git a/resource_sample_py/resource_sample/activities/__init__.py b/resource_sample_py/resource_sample/activities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/resource_sample/resource_sample/activities/activity1.py b/resource_sample_py/resource_sample/activities/activity1.py similarity index 86% rename from resource_sample/resource_sample/activities/activity1.py rename to resource_sample_py/resource_sample/activities/activity1.py index b62d05d..66e78c3 100644 --- a/resource_sample/resource_sample/activities/activity1.py +++ b/resource_sample_py/resource_sample/activities/activity1.py @@ -3,7 +3,7 @@ from temporalio import activity from dataclasses import dataclass -from resource_sample.common.messages import ComposeGreetingInput +from common.messages import ComposeGreetingInput # Basic activity that logs and does string concatenation @activity.defn(name="compose_greeting") diff --git a/resource_sample/resource_sample/activities/activity2.py b/resource_sample_py/resource_sample/activities/activity2.py similarity index 94% rename from resource_sample/resource_sample/activities/activity2.py rename to resource_sample_py/resource_sample/activities/activity2.py index 491a94f..c456724 100644 --- a/resource_sample/resource_sample/activities/activity2.py +++ b/resource_sample_py/resource_sample/activities/activity2.py @@ -3,7 +3,7 @@ from temporalio import activity from dataclasses import dataclass -from resource_sample.common.messages import ComposeGreetingInput +from common.messages import ComposeGreetingInput import os import hvac diff --git a/resource_sample_py/resource_sample/common/__init__.py b/resource_sample_py/resource_sample/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/resource_sample/resource_sample/common/messages.py b/resource_sample_py/resource_sample/common/messages.py similarity index 100% rename from resource_sample/resource_sample/common/messages.py rename to resource_sample_py/resource_sample/common/messages.py diff --git a/resource_sample_py/resource_sample/worker.py b/resource_sample_py/resource_sample/worker.py new file mode 100644 index 0000000..fed2e8e --- /dev/null +++ b/resource_sample_py/resource_sample/worker.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + + +"""Temporal client worker.""" + +import asyncio +import glob +import inspect +import logging +import os +import sys +import traceback + +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporallib.auth import ( + AuthOptions, + GoogleAuthOptions, + KeyPair, + MacaroonAuthOptions, +) +from temporallib.client import Client, Options +from temporallib.encryption import EncryptionOptions +from temporallib.worker import SentryOptions, Worker, WorkerOptions +from workflows.workflow1 import GreetingWorkflow, VaultWorkflow +from activities.activity1 import compose_greeting +from activities.activity2 import vault_test + +logger = logging.getLogger(__name__) + + +def _get_auth_header(): + """Get auth options based on provider. + + Returns: + AuthOptions object. + """ + if os.getenv("TWC_AUTH_PROVIDER") == "candid": + return MacaroonAuthOptions( + keys=KeyPair(private=os.getenv("TWC_CANDID_PRIVATE_KEY"), public=os.getenv("TWC_CANDID_PUBLIC_KEY")), + macaroon_url=os.getenv("TWC_CANDID_URL"), + username=os.getenv("TWC_CANDID_USERNAME"), + ) + + if os.getenv("TWC_AUTH_PROVIDER") == "google": + return GoogleAuthOptions( + type="service_account", + project_id=os.getenv("TWC_OIDC_PROJECT_ID"), + private_key_id=os.getenv("TWC_OIDC_PRIVATE_KEY_ID"), + private_key=os.getenv("TWC_OIDC_PRIVATE_KEY"), + client_email=os.getenv("TWC_OIDC_CLIENT_EMAIL"), + client_id=os.getenv("TWC_OIDC_CLIENT_ID"), + auth_uri=os.getenv("TWC_OIDC_AUTH_URI"), + token_uri=os.getenv("TWC_OIDC_TOKEN_URI"), + auth_provider_x509_cert_url=os.getenv("TWC_OIDC_AUTH_CERT_URL"), + client_x509_cert_url=os.getenv("TWC_OIDC_CLIENT_CERT_URL"), + ) + + return None + +def _init_runtime_with_prometheus(port: int) -> Runtime: + """Create runtime for use with Prometheus metrics. + + Args: + port: Port of prometheus. + + Returns: + Runtime for temporalio with prometheus. + """ + return Runtime(telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address=f"0.0.0.0:{port}"))) + + +async def run_worker(): + """Connect Temporal worker to Temporal server.""" + client_config = Options( + host=os.getenv("TWC_HOST"), + namespace=os.getenv("TWC_NAMESPACE"), + queue=os.getenv("TWC_QUEUE"), + ) + + if os.getenv("TWC_TLS_ROOT_CAS", "").strip() != "": + client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS") + + if os.getenv("TWC_AUTH_PROVIDER", "").strip() != "": + client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header()) + + if os.getenv("TWC_ENCRYPTION_KEY", "").strip() != "": + client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True) + + worker_opt = None + dsn = os.getenv("TWC_SENTRY_DSN", "").strip() + if dsn != "": + sentry = SentryOptions( + dsn=dsn, + release=os.getenv("TWC_SENTRY_RELEASE", "").strip() or None, + environment=os.getenv("TWC_SENTRY_ENVIRONMENT", "").strip() or None, + redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS", False), + sample_rate=os.getenv("TWC_SENTRY_SAMPLE_RATE", 1.0), + ) + + worker_opt = WorkerOptions(sentry=sentry) + + runtime = _init_runtime_with_prometheus(os.getenv("TWC_PROMETHEUS_PORT")) + + client = await Client.connect(client_config, runtime=runtime) + + worker = Worker( + client=client, + task_queue=os.getenv("TWC_QUEUE"), + workflows=[GreetingWorkflow], + activities=[compose_greeting], + worker_opt=worker_opt, + ) + + await worker.run() + + +if __name__ == "__main__": # pragma: nocover + asyncio.run(run_worker()) diff --git a/resource_sample_py/resource_sample/workflows/__init__.py b/resource_sample_py/resource_sample/workflows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/resource_sample/resource_sample/workflows/workflow1.py b/resource_sample_py/resource_sample/workflows/workflow1.py similarity index 62% rename from resource_sample/resource_sample/workflows/workflow1.py rename to resource_sample_py/resource_sample/workflows/workflow1.py index 901f524..253b94f 100644 --- a/resource_sample/resource_sample/workflows/workflow1.py +++ b/resource_sample_py/resource_sample/workflows/workflow1.py @@ -7,13 +7,8 @@ from temporalio.common import RetryPolicy from temporalio.exceptions import FailureError -from pathlib import Path -import sys -path_root = Path(__file__).parents[2] -sys.path.append(str(path_root)) - with workflow.unsafe.imports_passed_through(): - import resource_sample.activities.activity1 as all_activities + import activities.activity1 as all_activities from datetime import timedelta # Basic workflow that logs and invokes an activity @@ -27,3 +22,14 @@ async def run(self, name: str) -> str: all_activities.ComposeGreetingInput("Hello", name), start_to_close_timeout=timedelta(seconds=10), ) + +@workflow.defn(name="VaultWorkflow") +class VaultWorkflow: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running workflow with parameter %s" % name) + return await workflow.execute_activity( + all_activities.vault_test, + all_activities.ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) \ No newline at end of file diff --git a/resource_sample_py/rockcraft.yaml b/resource_sample_py/rockcraft.yaml new file mode 100644 index 0000000..e8e67b2 --- /dev/null +++ b/resource_sample_py/rockcraft.yaml @@ -0,0 +1,71 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +name: temporal-worker +summary: Temporal worker app +description: OCI image for Temporal worker app +version: "1.0" +base: bare +build-base: ubuntu@22.04 +license: Apache-2.0 + +services: + temporal-worker: + override: replace + summary: "temporal worker" + startup: disabled + command: "./app/scripts/start-worker.sh" + environment: + TWC_HOST: 10.1.232.44:7233 + TWC_NAMESPACE: default + TWC_QUEUE: test-queue + TWC_TLS_ROOT_CAS: "" + TWC_AUTH_PROVIDER: "" # "google" or "candid" + TWC_ENCRYPTION_KEY: "" + TWC_SENTRY_DSN: "" + TWC_SENTRY_RELEASE: "" + TWC_SENTRY_ENVIRONMENT: "" + TWC_SENTRY_REDACT_PARAMS: "False" + TWC_SENTRY_SAMPLE_RATE: "1.0" + TWC_CANDID_URL: "" + TWC_CANDID_USERNAME: "" + TWC_CANDID_PRIVATE_KEY: "" + TWC_CANDID_PUBLIC_KEY: "" + TWC_OIDC_PROJECT_ID: "" + TWC_OIDC_PRIVATE_KEY_ID: "" + TWC_OIDC_PRIVATE_KEY: "" + TWC_OIDC_CLIENT_EMAIL: "" + TWC_OIDC_CLIENT_ID: "" + TWC_OIDC_AUTH_URI: "" + TWC_OIDC_TOKEN_URI: "" + TWC_OIDC_AUTH_CERT_URL: "" + TWC_OIDC_CLIENT_CERT_URL: "" + + # Vault variables + TWC_VAULT_ADDR: "" + TWC_VAULT_CACERT_BYTES: "" + TWC_VAULT_ROLE_ID: "" + TWC_VAULT_ROLE_SECRET_ID: "" + TWC_VAULT_MOUNT: "" + TWC_VAULT_CERT_PATH: "" + +platforms: + amd64: + +parts: + worker-dependencies: + plugin: python + source: . + build-packages: + - build-essential + stage-packages: + - python3.10-venv + - coreutils + - bash + + worker-app: + plugin: dump + source: . + organize: + "*": app/ + stage: + - app diff --git a/resource_sample_py/scripts/start-worker.sh b/resource_sample_py/scripts/start-worker.sh new file mode 100755 index 0000000..7c004d9 --- /dev/null +++ b/resource_sample_py/scripts/start-worker.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# update 'resource_sample' accordingly +python app/resource_sample/worker.py diff --git a/resource_sample/config.yaml b/sample_files/config.yaml similarity index 58% rename from resource_sample/config.yaml rename to sample_files/config.yaml index 6ec72e3..715d1ea 100644 --- a/resource_sample/config.yaml +++ b/sample_files/config.yaml @@ -5,6 +5,3 @@ temporal-worker-k8s: host: "temporal-k8s:7233" queue: "test-queue" namespace: "default" - workflows-file-name: "python_samples-1.1.0-py3-none-any.whl" - supported-workflows: "all" - supported-activities: "all" diff --git a/resource_sample/invalid.env b/sample_files/invalid.env similarity index 100% rename from resource_sample/invalid.env rename to sample_files/invalid.env diff --git a/resource_sample/sample.env b/sample_files/sample.env similarity index 100% rename from resource_sample/sample.env rename to sample_files/sample.env diff --git a/src/charm.py b/src/charm.py index a297ff9..7dd0d0a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -7,9 +7,8 @@ """Charm definition and helpers.""" import logging -import re +import os import secrets -from pathlib import Path from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer @@ -21,12 +20,10 @@ from ops.model import ( ActiveStatus, BlockedStatus, - Container, MaintenanceStatus, ModelError, WaitingStatus, ) -from ops.pebble import CheckStatus from literals import ( LOG_FILE, @@ -125,9 +122,6 @@ def _on_restart(self, event): self.unit.status = MaintenanceStatus("restarting worker") container.restart(self.name) - self.unit.status = ActiveStatus( - f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}" - ) event.set_results({"result": "worker successfully restarted"}) @@ -159,11 +153,6 @@ def _on_update_status(self, event): self._update(event) return - check = container.get_check("up") - if check.status != CheckStatus.UP: - self.unit.status = MaintenanceStatus("Status check: DOWN") - return - self.unit.status = ActiveStatus( f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}" ) @@ -179,7 +168,7 @@ def _validate_pebble_plan(self, container): """ try: plan = container.get_plan().to_dict() - return bool(plan and plan["services"].get(self.name, {}).get("on-check-failure")) + return bool(plan and plan["services"].get(self.name, {})) except pebble.ConnectionError: return False @@ -215,72 +204,6 @@ def _process_env_file(self, event): except ModelError as err: logger.error(err) - def _process_wheel_file(self, event): # noqa: C901 - """Process wheel file attached by user. - - This method extracts the wheel file provided by the user and places the contents - into the workload container, which can then be read by the Temporal worker. - - Args: - event: The event triggered when the relation changed. - - Raises: - ValueError: if file is not found or an operation failed while extracting. - """ - if not self._state.is_ready(): - event.defer() - return - - if self.config["workflows-file-name"].strip() == "": - raise ValueError("Invalid config: wheel-file-name missing") - - if not _validate_wheel_name(self.config["workflows-file-name"]): - raise ValueError("Invalid config: invalid wheel-file-name") - - try: - resource_path = self.model.resources.fetch("workflows-file") - filename = Path(resource_path).name - - container = self.unit.get_container(self.name) - if not container.can_connect(): - event.defer() - self.unit.status = WaitingStatus("waiting for pebble api") - return - - container.exec(["rm", "-rf", "/user_provided"]).wait_output() - - wheel_file = f"/user_provided/{filename}" - original_wheel_file = f"/user_provided/{self.config['workflows-file-name']}" - wheel_arr = self.config["workflows-file-name"].split("-") - unpacked_file_name = f"/user_provided/{'-'.join(wheel_arr[0:2])}" - - with open(resource_path, "rb") as file: - wheel_data = file.read() - - # Push wheel file to the container and extract it. - container.push(wheel_file, wheel_data, make_dirs=True) - - # Rename wheel file to its original name and install it - if wheel_file != original_wheel_file: - container.exec(["mv", wheel_file, original_wheel_file]).wait() - - _install_wheel_file( - container=container, wheel_file_name=original_wheel_file, proxy=self.config["http-proxy"] - ) - _unpack_wheel_file(container=container, wheel_file_name=original_wheel_file) - module_name = _get_module_name(container=container, unpacked_file_name=unpacked_file_name) - _check_required_directories( - container=container, unpacked_file_name=unpacked_file_name, module_name=module_name - ) - - if self.unit.is_leader(): - self._state.module_name = module_name - self._state.unpacked_file_name = unpacked_file_name - - except ModelError as err: - logger.error(err) - raise ValueError("Invalid state: workflows-file resource not found") from err - def _check_required_config(self, config_list): """Check if required config has been set by user. @@ -310,14 +233,10 @@ def _validate(self, event): # noqa: C901 if not self._state.is_ready(): raise ValueError("peer relation not ready") - self._process_wheel_file(event) self._process_env_file(event) self._check_required_config(REQUIRED_CHARM_CONFIG) - if self._state.module_name is None: - raise ValueError("Invalid state: error extracting folder name from wheel file") - if self.config["auth-provider"]: if not self.config["auth-provider"] in SUPPORTED_AUTH_PROVIDERS: raise ValueError("Invalid config: auth-provider not supported") @@ -349,28 +268,25 @@ def _update(self, event): # noqa: C901 self.unit.status = WaitingStatus("waiting for pebble api") return - # ensure the container is set up - _setup_container(container, self.config["http-proxy"]) - logger.info("Configuring Temporal worker") - module_name = self._state.module_name - unpacked_file_name = self._state.unpacked_file_name - context = {} if self._state.env: context.update(self._state.env) + proxy_vars = { + "HTTP_PROXY": "JUJU_CHARM_HTTP_PROXY", + "HTTPS_PROXY": "JUJU_CHARM_HTTPS_PROXY", + "NO_PROXY": "JUJU_CHARM_NO_PROXY", + } + + for key, env_var in proxy_vars.items(): + value = os.environ.get(env_var) + if value: + context.update({key: value}) + context.update({convert_env_var(key): value for key, value in self.config.items()}) - command = f"python worker.py {unpacked_file_name} {module_name}" - - context.update( - { - "HTTP_PROXY": self.config["http-proxy"], - "HTTPS_PROXY": self.config["https-proxy"], - "NO_PROXY": self.config["no-proxy"], - } - ) + context.update({"TWC_PROMETHEUS_PORT": PROMETHEUS_PORT}) try: vault_config = self.vault_relation._get_vault_config() @@ -396,19 +312,10 @@ def _update(self, event): # noqa: C901 "services": { self.name: { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": context, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -433,148 +340,5 @@ def convert_env_var(config_var, prefix="TWC_"): return prefix + converted_env_var -def _setup_container(container: Container, proxy: str): - """Copy worker file to the container and install dependencies. - - Args: - container: Container unit on which to perform action. - proxy: optional proxy value used in running pip command. - - Raises: - ValueError: if worker dependencies fail to install. - """ - resources_path = Path(__file__).parent / "resources" - _push_container_file(container, resources_path, "/worker.py", resources_path / "worker.py") - _push_container_file(container, resources_path, "/../literals.py", resources_path / "../literals.py") - _push_container_file(container, resources_path, "/check_status.py", resources_path / "check_status.py") - _push_container_file( - container, resources_path, "/worker-dependencies.txt", resources_path / "worker-dependencies.txt" - ) - - # Install worker dependencies - worker_dependencies_path = "/worker-dependencies.txt" - logger.info("installing worker dependencies...") - - command = ["pip", "install", "-r", str(worker_dependencies_path)] - if proxy.strip() != "": - command.insert(2, f"--proxy={proxy}") - - _, error = container.exec(command).wait_output() - if error is not None and error.strip() != "" and not error.strip().startswith("WARNING"): - logger.error(f"failed to install worker dependencies: {error}") - raise ValueError("Invalid state: failed to install worker dependencies") - - -def _validate_wheel_name(filename): - """Validate wheel file name. - - Args: - filename: Name of the wheel file. - - Returns: - True if the file name is valid, False otherwise. - """ - # Define an allowed list of allowed characters and patterns - allowed_pattern = r"^[a-zA-Z0-9-._]+-[a-zA-Z0-9_.]+-([a-zA-Z0-9_.]+|any|py2.py3)-(none|linux|macosx|win)-(any|any|intel|amd64)\.whl$" - return bool(re.match(allowed_pattern, filename)) - - -def _push_container_file(container: Container, src_path, dest_path, resource): - """Copy worker file to the container and install dependencies. - - Args: - container: Container unit on which to perform action. - src_path: resource path. - dest_path: destination path on container. - resource: resource to push to container. - """ - source_path = src_path / resource - with open(source_path, "r") as file_source: - logger.info(f"pushing {resource} source...") - container.push(dest_path, file_source, make_dirs=True) - - -def _install_wheel_file(container: Container, wheel_file_name: str, proxy: str): - """Install named wheel file on container. - - Args: - container: Container unit on which to perform action. - wheel_file_name: name of wheel file to install. - proxy: optional proxy used when installing wheel file. - - Raises: - ValueError: if wheel file fails to install. - """ - command = ["pip", "install", wheel_file_name] - if proxy.strip() != "": - command.insert(2, f"--proxy={proxy}") - - _, error = container.exec(command).wait_output() - if error is not None and error.strip() != "" and not error.strip().startswith("WARNING"): - logger.error(f"failed to install wheel file: {error}") - raise ValueError("Invalid state: failed to install wheel file") - - -def _unpack_wheel_file(container: Container, wheel_file_name: str): - """Unpack named wheel file on container. - - Args: - container: Container unit on which to perform action. - wheel_file_name: name of wheel file to unpack. - - Raises: - ValueError: if wheel unpacking fails. - """ - _, error = container.exec(["wheel", "unpack", wheel_file_name, "-d", "/user_provided"]).wait_output() - if error is not None and error.strip() != "" and not error.strip().startswith("WARNING"): - logger.error(f"failed to unpack wheel file: {error}") - raise ValueError("Invalid state: failed to unpack wheel file") - - -def _get_module_name(container: Container, unpacked_file_name: str): - """Get module name from unpacked wheel file. - - Args: - container: Container unit on which to perform action. - unpacked_file_name: Name of wheel file after unpacking. - - Returns: - Name of module provided by the user. - - Raises: - ValueError: if module name extraction fails. - """ - # Find the name of the module provided by the user and set it in state. - command = f"find {unpacked_file_name} -mindepth 1 -maxdepth 1 -type d ! -name *.dist-info ! -name *.whl" - out, error = container.exec(command.split(" ")).wait_output() - - if error is not None and error.strip() != "": - logger.error(f"failed to extract module name from wheel file: {error}") - raise ValueError("Invalid state: failed to extract module name from wheel file") - - directories = out.split("\n") - return Path(directories[0]).name - - -def _check_required_directories(container: Container, unpacked_file_name: str, module_name: str): - """Verify that the required workflows and activities directories are present. - - Args: - container: Container unit on which to perform action. - unpacked_file_name: Name of wheel file after unpacking. - module_name: Name of module provided by user. - - Raises: - ValueError: if required directories are not found in attached resource. - """ - command = f"find {unpacked_file_name}/{module_name} -mindepth 1 -maxdepth 1 -type d" - out, _ = container.exec(command.split(" ")).wait_output() - provided_directories = out.split("\n") - required_directories = ["workflows", "activities"] - for d in required_directories: - if f"{unpacked_file_name}/{module_name}/{d}" not in provided_directories: - raise ValueError(f"Invalid state: {d} directory not found in attached resource") - - if __name__ == "__main__": # pragma: nocover main.main(TemporalWorkerK8SOperatorCharm) diff --git a/src/literals.py b/src/literals.py index 7f03171..9ffe1e9 100644 --- a/src/literals.py +++ b/src/literals.py @@ -8,7 +8,7 @@ VALID_LOG_LEVELS = ["info", "debug", "warning", "error", "critical"] LOG_FILE = "/var/log/temporal" -REQUIRED_CHARM_CONFIG = ["host", "namespace", "queue", "supported-workflows", "supported-activities"] +REQUIRED_CHARM_CONFIG = ["host", "namespace", "queue"] REQUIRED_CANDID_CONFIG = ["candid-url", "candid-username", "candid-public-key", "candid-private-key"] REQUIRED_OIDC_CONFIG = [ "oidc-auth-type", diff --git a/src/resources/check_status.py b/src/resources/check_status.py deleted file mode 100644 index 2491f24..0000000 --- a/src/resources/check_status.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Temporal worker status checker.""" - - -import logging -import sys - -logger = logging.getLogger(__name__) - - -def check_worker_status(): - """Check Temporal worker status by reading status file.""" - try: - with open("worker_status.txt", "r") as status_file: - status = status_file.read().strip() - logger.info(f"Async status: {status}") - - if status.startswith("Success"): - exit_code = 0 - else: - exit_code = 1 - except FileNotFoundError: - logger.error("Status file not found. Worker is not running.") - exit_code = 1 - - sys.exit(exit_code) - - -if __name__ == "__main__": - check_worker_status() diff --git a/src/resources/temporal_client/__init__.py b/src/resources/temporal_client/__init__.py deleted file mode 100644 index 3b8510e..0000000 --- a/src/resources/temporal_client/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal client sample workflows and activities.""" diff --git a/src/resources/temporal_client/activities.py b/src/resources/temporal_client/activities.py deleted file mode 100644 index 3306412..0000000 --- a/src/resources/temporal_client/activities.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal sample activity.""" - -from dataclasses import dataclass - -from temporalio import activity - - -@dataclass -class ComposeGreetingInput: - """Greeting class. - - Attrs: - greeting: greeting string. - name: name string. - """ - - greeting: str - name: str - - -@activity.defn(name="compose_greeting") -async def compose_greeting(arg: ComposeGreetingInput) -> str: - """Log and do string concatenation activity. - - Args: - arg: greeting to log. - - Returns: - Greeting string. - """ - activity.logger.info(f"Running activity with parameter {arg}") - return f"{arg.greeting}, {arg.name}!" diff --git a/src/resources/temporal_client/workflows.py b/src/resources/temporal_client/workflows.py deleted file mode 100644 index fe1db1c..0000000 --- a/src/resources/temporal_client/workflows.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal sample workflow.""" - -from datetime import timedelta - -from temporalio import workflow - -with workflow.unsafe.imports_passed_through(): - from .activities import ComposeGreetingInput, compose_greeting - - -@workflow.defn(name="GreetingWorkflow") -class GreetingWorkflow: - """Basic workflow that logs and invokes an activity.""" - - @workflow.run - async def run(self, name: str) -> str: - """Workflow runner. - - Args: - name: value to be logged. - - Returns: - Workflow execution. - """ - workflow.logger.info(f"Running workflow with parameter {name}") - return await workflow.execute_activity( - compose_greeting, - ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=10), - ) diff --git a/src/resources/worker-dependencies.txt b/src/resources/worker-dependencies.txt deleted file mode 100644 index aed7c75..0000000 --- a/src/resources/worker-dependencies.txt +++ /dev/null @@ -1,2 +0,0 @@ -temporal-lib-py==1.3.1 -wheel==0.41.2 diff --git a/src/resources/worker.py b/src/resources/worker.py deleted file mode 100644 index 4449e93..0000000 --- a/src/resources/worker.py +++ /dev/null @@ -1,188 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal client worker.""" - -import asyncio -import glob -import inspect -import logging -import os -import sys -import traceback -from importlib import import_module - -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig -from temporallib.auth import ( - AuthOptions, - GoogleAuthOptions, - KeyPair, - MacaroonAuthOptions, -) -from temporallib.client import Client, Options -from temporallib.encryption import EncryptionOptions -from temporallib.worker import SentryOptions, Worker, WorkerOptions - -from literals import PROMETHEUS_PORT - -logger = logging.getLogger(__name__) - - -def _get_auth_header(): - """Get auth options based on provider. - - Returns: - AuthOptions object. - """ - if os.getenv("TWC_AUTH_PROVIDER") == "candid": - return MacaroonAuthOptions( - keys=KeyPair(private=os.getenv("TWC_CANDID_PRIVATE_KEY"), public=os.getenv("TWC_CANDID_PUBLIC_KEY")), - macaroon_url=os.getenv("TWC_CANDID_URL"), - username=os.getenv("TWC_CANDID_USERNAME"), - ) - - if os.getenv("TWC_AUTH_PROVIDER") == "google": - return GoogleAuthOptions( - type="service_account", - project_id=os.getenv("TWC_OIDC_PROJECT_ID"), - private_key_id=os.getenv("TWC_OIDC_PRIVATE_KEY_ID"), - private_key=os.getenv("TWC_OIDC_PRIVATE_KEY"), - client_email=os.getenv("TWC_OIDC_CLIENT_EMAIL"), - client_id=os.getenv("TWC_OIDC_CLIENT_ID"), - auth_uri=os.getenv("TWC_OIDC_AUTH_URI"), - token_uri=os.getenv("TWC_OIDC_TOKEN_URI"), - auth_provider_x509_cert_url=os.getenv("TWC_OIDC_AUTH_CERT_URL"), - client_x509_cert_url=os.getenv("TWC_OIDC_CLIENT_CERT_URL"), - ) - - return None - - -def _import_modules(module_type, unpacked_file_name, module_name, supported_modules): - """Extract supported workflows and activities . - - Args: - module_type: "workflows" or "activities". - unpacked_file_name: Name of unpacked wheel file. - module_name: Parent module name extracted from wheel file. - supported_modules: list of supported modules to be extracted from module file. - - Returns: - List of supported module references extracted from .py file. - """ - folder_path = os.path.join(os.getcwd(), unpacked_file_name, module_name, module_type) - sys.path.append(folder_path) - file_names = glob.glob(f"{folder_path}/*.py") - file_names = [os.path.basename(file) for file in file_names] - - module_list = [] - for file_name in file_names: - module_name = file_name[:-3] - module = import_module(module_name) - - if "all" in supported_modules: - for _, obj in inspect.getmembers(module): - if module_type == "workflows": - if inspect.isclass(obj) and inspect.getmodule(obj) is module: - module_list.append(obj) - else: - if inspect.isfunction(obj) and inspect.getmodule(obj) is module: - module_list.append(obj) - else: - for sm in supported_modules: - if hasattr(module, sm.strip()): - module_list.append(getattr(module, sm.strip())) - - return module_list - - -def _init_runtime_with_prometheus(port: int) -> Runtime: - """Create runtime for use with Prometheus metrics. - - Args: - port: Port of prometheus. - - Returns: - Runtime for temporalio with prometheus. - """ - return Runtime(telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address=f"0.0.0.0:{port}"))) - - -async def run_worker(unpacked_file_name, module_name): - """Connect Temporal worker to Temporal server. - - Args: - unpacked_file_name: Name of unpacked wheel file. - module_name: Parent module name extracted from wheel file. - """ - client_config = Options( - host=os.getenv("TWC_HOST"), - namespace=os.getenv("TWC_NAMESPACE"), - queue=os.getenv("TWC_QUEUE"), - ) - - try: - workflows = _import_modules( - "workflows", - unpacked_file_name=unpacked_file_name, - module_name=module_name, - supported_modules=os.getenv("TWC_SUPPORTED_WORKFLOWS").split(","), - ) - activities = _import_modules( - "activities", - unpacked_file_name=unpacked_file_name, - module_name=module_name, - supported_modules=os.getenv("TWC_SUPPORTED_ACTIVITIES").split(","), - ) - - if os.getenv("TWC_TLS_ROOT_CAS").strip() != "": - client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS") - - if os.getenv("TWC_AUTH_PROVIDER").strip() != "": - client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header()) - - if os.getenv("TWC_ENCRYPTION_KEY").strip() != "": - client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True) - - worker_opt = None - dsn = os.getenv("TWC_SENTRY_DSN").strip() - if dsn != "": - sentry = SentryOptions( - dsn=dsn, - release=os.getenv("TWC_SENTRY_RELEASE").strip() or None, - environment=os.getenv("TWC_SENTRY_ENVIRONMENT").strip() or None, - redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS"), - sample_rate=os.getenv("TWC_SENTRY_SAMPLE_RATE"), - ) - - worker_opt = WorkerOptions(sentry=sentry) - - runtime = _init_runtime_with_prometheus(PROMETHEUS_PORT) - - client = await Client.connect(client_config, runtime=runtime) - - worker = Worker( - client=client, - task_queue=os.getenv("TWC_QUEUE"), - workflows=workflows, - activities=activities, - worker_opt=worker_opt, - ) - - with open("worker_status.txt", "w") as status_file: - status_file.write("Success") - await worker.run() - except Exception as e: - # If an error occurs, write the error message to the status file - with open("worker_status.txt", "w") as status_file: - logger.exception("Error in the workflow:") - traceback.print_exception(type(e), e, e.__traceback__, file=status_file) - - -if __name__ == "__main__": # pragma: nocover - global_unpacked_file_name = sys.argv[1] - global_module_name = sys.argv[2] - - asyncio.run(run_worker(global_unpacked_file_name, global_module_name)) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9be93e5..584918e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -10,9 +10,7 @@ from helpers import ( APP_NAME, APP_NAME_SERVER, - METADATA, WORKER_CONFIG, - attach_worker_resource_file, get_application_url, setup_temporal_ecosystem, ) @@ -20,7 +18,6 @@ logger = logging.getLogger(__name__) -rsc_path = "./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl" env_rsc_path = "./resource_sample/sample.env" @@ -29,16 +26,14 @@ async def deploy(ops_test: OpsTest): """Verify the app is up and running.""" await ops_test.model.set_config({"update-status-hook-interval": "1m"}) - await setup_temporal_ecosystem(ops_test) charm = await ops_test.build_charm(".") resources = { - "temporal-worker-image": METADATA["containers"]["temporal-worker"]["upstream-source"], - "workflows-file": rsc_path, + "temporal-worker-image": "localhost:32000/temporal-worker-rock", "env-file": env_rsc_path, } - await ops_test.model.deploy(charm, resources=resources, config=WORKER_CONFIG, application_name=APP_NAME) + await setup_temporal_ecosystem(ops_test) async with ops_test.fast_forward(): await ops_test.model.wait_for_idle( @@ -50,5 +45,3 @@ async def deploy(ops_test: OpsTest): url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233) await ops_test.model.applications[APP_NAME].set_config({"host": url}) - - await attach_worker_resource_file(ops_test, rsc_type="workflows") diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index ff299e9..a0c970f 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -23,9 +23,6 @@ WORKER_CONFIG = { "namespace": "default", "queue": "test-queue", - "workflows-file-name": "python_samples-1.1.0-py3-none-any.whl", - "supported-workflows": "all", - "supported-activities": "all", } @@ -194,34 +191,23 @@ async def setup_temporal_ecosystem(ops_test: OpsTest): assert ops_test.model.applications[APP_NAME_SERVER].units[0].workload_status == "active" -async def attach_worker_resource_file(ops_test: OpsTest, rsc_type="workflows"): +async def attach_worker_invalid_env_file(ops_test: OpsTest): """Scale the application to the provided number and wait for idle. Args: ops_test: PyTest object. - rsc_type: Resource type. """ - if rsc_type == "workflows": - rsc_name = "workflows-file" - rsc_path = "./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl" - else: - rsc_name = "env-file" - rsc_path = "./resource_sample/invalid.env" + rsc_name = "env-file" + rsc_path = "./sample_files/invalid.env" logger.info(f"Attaching resource: {APP_NAME} {rsc_name}={rsc_path}") with open(rsc_path, "rb") as file: ops_test.model.applications[APP_NAME].attach_resource(rsc_name, rsc_path, file) - if rsc_type == "workflows": - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", raise_on_error=False, raise_on_blocked=False, timeout=600 - ) - assert ops_test.model.applications[APP_NAME].units[0].workload_status == "active" - else: - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="blocked", raise_on_error=False, raise_on_blocked=False, timeout=600 - ) - assert ops_test.model.applications[APP_NAME].units[0].workload_status == "blocked" + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="blocked", raise_on_error=False, raise_on_blocked=False, timeout=600 + ) + assert ops_test.model.applications[APP_NAME].units[0].workload_status == "blocked" async def read_vault_unit_statuses(ops_test: OpsTest): diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index e2f2637..ffa2aa2 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -8,7 +8,7 @@ import pytest from conftest import deploy # noqa: F401, pylint: disable=W0611 -from helpers import attach_worker_resource_file, run_sample_workflow +from helpers import attach_worker_invalid_env_file, run_sample_workflow from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) @@ -25,4 +25,4 @@ async def test_basic_client(self, ops_test: OpsTest): async def test_invalid_env_file(self, ops_test: OpsTest): """Attaches an invalid .env file to the worker.""" - await attach_worker_resource_file(ops_test, rsc_type="env-file") + await attach_worker_invalid_env_file(ops_test) diff --git a/tests/integration/test_upgrades.py b/tests/integration/test_upgrades.py index 987b1f6..5238307 100644 --- a/tests/integration/test_upgrades.py +++ b/tests/integration/test_upgrades.py @@ -23,6 +23,9 @@ @pytest.mark.skip_if_deployed +@pytest.mark.skip( + "skipping due to migration of resource file to ROCK image. Renable after this revision 12 on latest/edge." +) @pytest_asyncio.fixture(name="deploy", scope="module") async def deploy(ops_test: OpsTest): """Verify the app is up and running.""" diff --git a/tests/unit/literals.py b/tests/unit/literals.py index 8c6f0a0..57aad1a 100644 --- a/tests/unit/literals.py +++ b/tests/unit/literals.py @@ -11,12 +11,9 @@ "host": "test-host", "namespace": "test-namespace", "queue": "test-queue", - "supported-workflows": "all", - "supported-activities": "all", "sentry-dsn": "", "sentry-release": "", "sentry-environment": "", - "workflows-file-name": "python_samples-1.1.0-py3-none-any.whl", "encryption-key": "", "auth-provider": "candid", "tls-root-cas": "", @@ -34,9 +31,6 @@ "oidc-token-uri": "", "oidc-auth-cert-url": "", "oidc-client-cert-url": "", - "http-proxy": "proxy", - "https-proxy": "proxy", - "no-proxy": "none", } EXPECTED_VAULT_ENV = { @@ -56,11 +50,9 @@ "TWC_CANDID_USERNAME": "test-username", "TWC_ENCRYPTION_KEY": "", "TWC_HOST": "test-host", - "TWC_HTTPS_PROXY": "proxy", - "TWC_HTTP_PROXY": "proxy", "TWC_LOG_LEVEL": "debug", "TWC_NAMESPACE": "test-namespace", - "TWC_NO_PROXY": "none", + "TWC_PROMETHEUS_PORT": 9000, "TWC_OIDC_AUTH_CERT_URL": "", "TWC_OIDC_AUTH_TYPE": "", "TWC_OIDC_AUTH_URI": "", @@ -77,11 +69,5 @@ "TWC_SENTRY_RELEASE": "", "TWC_SENTRY_SAMPLE_RATE": 1.0, "TWC_SENTRY_REDACT_PARAMS": False, - "TWC_SUPPORTED_ACTIVITIES": "all", - "TWC_SUPPORTED_WORKFLOWS": "all", "TWC_TLS_ROOT_CAS": "", - "TWC_WORKFLOWS_FILE_NAME": "python_samples-1.1.0-py3-none-any.whl", - "HTTP_PROXY": "proxy", - "HTTPS_PROXY": "proxy", - "NO_PROXY": "none", } diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 5075a79..202d522 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -6,11 +6,9 @@ """Temporal worker charm unit tests.""" import json -from unittest import TestCase, mock +from unittest import TestCase -from ops import Container from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus -from ops.pebble import CheckStatus from ops.testing import Harness from charm import TemporalWorkerK8SOperatorCharm @@ -40,52 +38,32 @@ def test_initial_plan(self): initial_plan = harness.get_container_pebble_plan(CONTAINER_NAME).to_dict() self.assertEqual(initial_plan, {}) - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - def test_attach_resource(self, _process_wheel_file): - """The workflows file resource can be attached.""" + def test_blocked_on_missing_host(self): + """The charm is blocked on missing host config.""" harness = self.harness # Simulate peer relation readiness. harness.add_relation("peer", "temporal") - harness.add_resource("workflows-file", "") - harness.charm.on.config_changed.emit() - _process_wheel_file.assert_called() self.assertEqual(harness.model.unit.status, BlockedStatus("Invalid config: host value missing")) - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - def test_ready(self, _process_wheel_file, _setup_container): + def test_ready(self): """The charm is ready.""" harness = self.harness - state = simulate_lifecycle(harness, CONFIG) + simulate_lifecycle(harness, CONFIG) harness.charm.on.config_changed.emit() - module_name = json.loads(state["module_name"]) - unpacked_file_name = json.loads(state["unpacked_file_name"]) - - command = f"python worker.py {unpacked_file_name} {module_name}" - # The plan is generated after pebble is ready. want_plan = { "services": { "temporal-worker": { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": WANT_ENV, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -97,21 +75,6 @@ def test_ready(self, _process_wheel_file, _setup_container): self.assertTrue(service.is_running()) self.assertEqual(harness.model.unit.status, MaintenanceStatus("replanning application")) - - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - @mock.patch.object(Container, "exec") - def test_update_status_up(self, _process_wheel_file, _setup_container, mock_exec): - """The charm updates the unit status to active based on UP status.""" - harness = self.harness - mock_exec.return_value = mock.MagicMock(wait_output=mock.MagicMock(return_value=("", None))) - - simulate_lifecycle(harness, CONFIG) - self.harness.container_pebble_ready(CONTAINER_NAME) - - container = harness.model.unit.get_container(CONTAINER_NAME) - container.get_check = mock.Mock(status="up") - container.get_check.return_value.status = CheckStatus.UP harness.charm.on.update_status.emit() self.assertEqual( @@ -119,37 +82,13 @@ def test_update_status_up(self, _process_wheel_file, _setup_container, mock_exec ActiveStatus(f"worker listening to namespace {CONFIG['namespace']!r} on queue {CONFIG['queue']!r}"), ) - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - @mock.patch.object(Container, "exec") - def test_update_status_down(self, _process_wheel_file, _setup_container, mock_exec): - """The charm updates the unit status to maintenance based on DOWN status.""" - harness = self.harness - mock_exec.return_value = mock.MagicMock(wait_output=mock.MagicMock(return_value=1)) - - simulate_lifecycle(harness, CONFIG) - self.harness.container_pebble_ready(CONTAINER_NAME) - - container = harness.model.unit.get_container(CONTAINER_NAME) - container.get_check = mock.Mock(status="up") - container.get_check.return_value.status = CheckStatus.DOWN - harness.charm.on.update_status.emit() - - self.assertEqual(harness.model.unit.status, MaintenanceStatus("Status check: DOWN")) - - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - def test_vault_relation(self, _process_wheel_file, _setup_container): + def test_vault_relation(self): """The charm is ready with vault relation.""" harness = self.harness - state = simulate_lifecycle(harness, CONFIG) + simulate_lifecycle(harness, CONFIG) harness.charm.on.config_changed.emit() - module_name = json.loads(state["module_name"]) - unpacked_file_name = json.loads(state["unpacked_file_name"]) - command = f"python worker.py {unpacked_file_name} {module_name}" - relation_id = add_vault_relation(self, harness) self.harness.update_config({}) @@ -158,19 +97,10 @@ def test_vault_relation(self, _process_wheel_file, _setup_container): "services": { "temporal-worker": { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": {**WANT_ENV, **EXPECTED_VAULT_ENV}, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -187,19 +117,10 @@ def test_vault_relation(self, _process_wheel_file, _setup_container): "services": { "temporal-worker": { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": {**WANT_ENV}, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -254,29 +175,12 @@ def simulate_lifecycle(harness, config): Args: harness: ops.testing.Harness object used to simulate charm lifecycle. config: object to update the charm's config. - - Returns: - Peer relation data. """ # Simulate peer relation readiness. - rel = harness.add_relation("peer", "temporal") + harness.add_relation("peer", "temporal") # Simulate pebble readiness. container = harness.model.unit.get_container(CONTAINER_NAME) harness.charm.on.temporal_worker_pebble_ready.emit(container) harness.update_config(config) - harness.add_resource("workflows-file", "bytes_content") - - harness.update_relation_data( - rel, - app_or_unit="temporal-worker-k8s", - key_values={ - "supported_workflows": json.dumps(["TestWorkflow"]), - "supported_activities": json.dumps(["test_activity"]), - "module_name": json.dumps("python_samples"), - "unpacked_file_name": json.dumps("python_sample-0.1.0"), - }, - ) - - return harness.get_relation_data(rel, "temporal-worker-k8s") diff --git a/tox.ini b/tox.ini index 4f288b3..bf8b44d 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ all_path = {[vars]src_path} {[vars]tst_path} [testenv] basepython = python3 -allowlist_externals = make +allowlist_externals = make, rockcraft setenv = PYTHONPATH = {toxinidir}:{toxinidir}/lib:{[vars]src_path} PYTHONBREAKPOINT=ipdb.set_trace @@ -56,7 +56,8 @@ commands = codespell {toxinidir} --skip {toxinidir}/.git --skip {toxinidir}/.tox \ --skip {toxinidir}/build --skip {toxinidir}/lib --skip {toxinidir}/venv \ --skip {toxinidir}/.mypy_cache --skip {toxinidir}/icon.svg \ - --skip {toxinidir}/src/resources/worker.py + --skip {toxinidir}/src/resources/worker.py \ + --skip {toxinidir}/resource_sample_py/resource_sample/worker.py pflake8 {[vars]all_path} isort --check-only --diff {[vars]all_path} black --check --diff {[vars]all_path} @@ -104,8 +105,8 @@ deps = poetry==1.8.3 -r{toxinidir}/requirements.txt commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_charm.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode + make -C resource_sample_py/ build_rock + pytest {[vars]tst_path}integration/test_charm.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} [testenv:integration-scaling] description = Run scaling integration tests @@ -119,8 +120,8 @@ deps = poetry==1.8.3 -r{toxinidir}/requirements.txt commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_scaling.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode + make -C resource_sample_py/ build_rock + pytest {[vars]tst_path}integration/test_scaling.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} [testenv:integration-upgrades] description = Run upgrades integration tests @@ -134,8 +135,8 @@ deps = poetry==1.8.3 -r{toxinidir}/requirements.txt commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_upgrades.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode + make -C resource_sample_py/ build_rock + pytest {[vars]tst_path}integration/test_upgrades.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} [testenv:integration-vault] description = Run vault integration tests @@ -150,5 +151,5 @@ deps = poetry==1.8.3 -r{toxinidir}/requirements.txt commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_vault.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode + make -C resource_sample_py/ build_rock + pytest {[vars]tst_path}integration/test_vault.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs}