diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fd9a63c2b..74af19e6e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -40,10 +40,6 @@ jobs: env: COVERAGE_FILE: ".coverage.${{ matrix.python-version }}" PYTEST_ARGS: "--cov=karapace --cov-append" - - run: make integration-tests - env: - COVERAGE_FILE: ".coverage.${{ matrix.python-version }}" - PYTEST_ARGS: "--cov=karapace --cov-append --random-order" - name: Archive logs uses: actions/upload-artifact@v4 diff --git a/karapace/protobuf/io.py b/karapace/protobuf/io.py index 3cc7445dc..7274caeca 100644 --- a/karapace/protobuf/io.py +++ b/karapace/protobuf/io.py @@ -13,13 +13,13 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.protobuf.type_element import TypeElement from multiprocessing import Process, Queue +from pathlib import Path from typing import Dict, Final, Generator, Iterable, Protocol from typing_extensions import Self, TypeAlias import hashlib import importlib import importlib.util -import os import subprocess import sys @@ -96,7 +96,7 @@ def get_protobuf_class_instance( class_name: str, cfg: Config, ) -> _ProtobufModel: - directory = cfg["protobuf_runtime_directory"] + directory = Path(cfg["protobuf_runtime_directory"]) deps_list = crawl_dependencies(schema) root_class_name = "" for value in deps_list.values(): @@ -104,21 +104,19 @@ def get_protobuf_class_instance( root_class_name = root_class_name + str(schema) proto_name = calculate_class_name(root_class_name) - proto_path = f"{proto_name}.proto" - work_dir = f"{directory}/{proto_name}" - if not os.path.isdir(directory): - os.mkdir(directory) - if not os.path.isdir(work_dir): - os.mkdir(work_dir) - class_path = f"{directory}/{proto_name}/{proto_name}_pb2.py" - if not os.path.exists(class_path): + main_proto_filename = f"{proto_name}.proto" + work_dir = directory / Path(proto_name) + work_dir.mkdir(exist_ok=True, parents=True) + class_path = work_dir / Path(f"{proto_name}_pb2.py") + + if not class_path.exists(): with open(f"{directory}/{proto_name}/{proto_name}.proto", mode="w", encoding="utf8") as proto_text: proto_text.write(replace_imports(str(schema), deps_list)) protoc_arguments = [ "protoc", "--python_out=./", - proto_path, + main_proto_filename, ] for value in deps_list.values(): proto_file_name = value["unique_class_name"] + ".proto" @@ -127,16 +125,18 @@ def get_protobuf_class_instance( with open(dependency_path, mode="w", encoding="utf8") as proto_text: proto_text.write(replace_imports(value["schema"], deps_list)) - if not os.path.isfile(class_path): + if not class_path.is_file(): subprocess.run( protoc_arguments, check=True, cwd=work_dir, ) - # todo: This will leave residues on sys.path in case of exceptions. If really must - # mutate sys.path, we should at least wrap in try-finally. - sys.path.append(f"./runtime/{proto_name}") + runtime_proto_path = f"./runtime/{proto_name}" + if runtime_proto_path not in sys.path: + # todo: This will leave residues on sys.path in case of exceptions. If really must + # mutate sys.path, we should at least wrap in try-finally. + sys.path.append(runtime_proto_path) spec = importlib.util.spec_from_file_location(f"{proto_name}_pb2", class_path) # This is reasonable to assert because we just created this file. assert spec is not None @@ -240,7 +240,7 @@ def read(self, bio: BytesIO) -> dict: return read_in_forked_process_multiprocess_process(self.config, self._writer_schema, self._reader_schema, bio) -_WriterQueue: TypeAlias = "Queue[bytes | BaseException]" +_WriterQueue: TypeAlias = "Queue[bytes | str | BaseException]" def writer_process( @@ -253,6 +253,8 @@ def writer_process( try: class_instance = get_protobuf_class_instance(writer_schema, message_name, config) dict_to_protobuf(class_instance, datum) + result = class_instance.SerializeToString() + writer_queue.put(result) # Writing happens in the forked process, catch is broad so exception will get communicated # back to calling process. except Exception as bare_exception: # pylint: disable=broad-exception-caught @@ -263,7 +265,6 @@ def writer_process( raise protobuf_exception except BaseException as base_exception: # pylint: disable=broad-exception-caught writer_queue.put(base_exception) - writer_queue.put(class_instance.SerializeToString()) def write_in_forked_process(