Skip to content

Commit

Permalink
test exc
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jul 24, 2024
1 parent a226610 commit 31192fb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ jobs:
env:
COVERAGE_FILE: ".coverage.${{ matrix.python-version }}"
PYTEST_ARGS: "--cov=karapace --cov-append"
- run: make integration-tests
env:
COVERAGE_FILE: ".coverage.${{ matrix.python-version }}"
PYTEST_ARGS: "--cov=karapace --cov-append --random-order"

- name: Archive logs
uses: actions/upload-artifact@v4
Expand Down
35 changes: 18 additions & 17 deletions karapace/protobuf/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
from karapace.protobuf.schema import ProtobufSchema
from karapace.protobuf.type_element import TypeElement
from multiprocessing import Process, Queue
from pathlib import Path
from typing import Dict, Final, Generator, Iterable, Protocol
from typing_extensions import Self, TypeAlias

import hashlib
import importlib
import importlib.util
import os
import subprocess
import sys

Expand Down Expand Up @@ -96,29 +96,27 @@ def get_protobuf_class_instance(
class_name: str,
cfg: Config,
) -> _ProtobufModel:
directory = cfg["protobuf_runtime_directory"]
directory = Path(cfg["protobuf_runtime_directory"])
deps_list = crawl_dependencies(schema)
root_class_name = ""
for value in deps_list.values():
root_class_name = root_class_name + value["unique_class_name"]
root_class_name = root_class_name + str(schema)
proto_name = calculate_class_name(root_class_name)

proto_path = f"{proto_name}.proto"
work_dir = f"{directory}/{proto_name}"
if not os.path.isdir(directory):
os.mkdir(directory)
if not os.path.isdir(work_dir):
os.mkdir(work_dir)
class_path = f"{directory}/{proto_name}/{proto_name}_pb2.py"
if not os.path.exists(class_path):
main_proto_filename = f"{proto_name}.proto"
work_dir = directory / Path(proto_name)
work_dir.mkdir(exist_ok=True, parents=True)
class_path = work_dir / Path(f"{proto_name}_pb2.py")

if not class_path.exists():
with open(f"{directory}/{proto_name}/{proto_name}.proto", mode="w", encoding="utf8") as proto_text:
proto_text.write(replace_imports(str(schema), deps_list))

protoc_arguments = [
"protoc",
"--python_out=./",
proto_path,
main_proto_filename,
]
for value in deps_list.values():
proto_file_name = value["unique_class_name"] + ".proto"
Expand All @@ -127,16 +125,18 @@ def get_protobuf_class_instance(
with open(dependency_path, mode="w", encoding="utf8") as proto_text:
proto_text.write(replace_imports(value["schema"], deps_list))

if not os.path.isfile(class_path):
if not class_path.is_file():
subprocess.run(
protoc_arguments,
check=True,
cwd=work_dir,
)

# todo: This will leave residues on sys.path in case of exceptions. If really must
# mutate sys.path, we should at least wrap in try-finally.
sys.path.append(f"./runtime/{proto_name}")
runtime_proto_path = f"./runtime/{proto_name}"
if runtime_proto_path not in sys.path:
# todo: This will leave residues on sys.path in case of exceptions. If really must
# mutate sys.path, we should at least wrap in try-finally.
sys.path.append(runtime_proto_path)
spec = importlib.util.spec_from_file_location(f"{proto_name}_pb2", class_path)
# This is reasonable to assert because we just created this file.
assert spec is not None
Expand Down Expand Up @@ -240,7 +240,7 @@ def read(self, bio: BytesIO) -> dict:
return read_in_forked_process_multiprocess_process(self.config, self._writer_schema, self._reader_schema, bio)


_WriterQueue: TypeAlias = "Queue[bytes | BaseException]"
_WriterQueue: TypeAlias = "Queue[bytes | str | BaseException]"


def writer_process(
Expand All @@ -253,6 +253,8 @@ def writer_process(
try:
class_instance = get_protobuf_class_instance(writer_schema, message_name, config)
dict_to_protobuf(class_instance, datum)
result = class_instance.SerializeToString()
writer_queue.put(result)
# Writing happens in the forked process, catch is broad so exception will get communicated
# back to calling process.
except Exception as bare_exception: # pylint: disable=broad-exception-caught
Expand All @@ -263,7 +265,6 @@ def writer_process(
raise protobuf_exception
except BaseException as base_exception: # pylint: disable=broad-exception-caught
writer_queue.put(base_exception)
writer_queue.put(class_instance.SerializeToString())


def write_in_forked_process(
Expand Down

0 comments on commit 31192fb

Please sign in to comment.