Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone Processor Server (#884 + #974) #1030

Merged
merged 37 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1c37ad5
basics of the procesor server
MehmedGIT Mar 28, 2023
42f16ab
Adapt the fix from #974
MehmedGIT Mar 28, 2023
ba92c87
check defaults, pass shallow copy
MehmedGIT Mar 28, 2023
0644b9e
complete processor server internals
MehmedGIT Mar 28, 2023
cdb73d5
implement most of the logic for everything
MehmedGIT Mar 28, 2023
6476ebb
fix some nasty errors
MehmedGIT Mar 29, 2023
0f8367c
use cleaner approach with sys.exit()
MehmedGIT Mar 29, 2023
4151a40
remove initLogging from processing worker
MehmedGIT Mar 29, 2023
36fec45
log exceptions of the processing server
MehmedGIT Mar 29, 2023
b2ad725
fix serialization/deserialization
MehmedGIT Mar 29, 2023
275d98b
refactoring: first portion
MehmedGIT Mar 29, 2023
498ed40
refactoring: second portion
MehmedGIT Apr 1, 2023
94c76e0
small improvements
MehmedGIT Apr 1, 2023
d9af9a0
Merge branch 'master' into processor-server
MehmedGIT Apr 2, 2023
bf44190
return: deploy type as enum
MehmedGIT Apr 3, 2023
1b91ccb
Merge branch 'processor-server' of github.com:OCR-D/core into process…
MehmedGIT Apr 3, 2023
863ae4a
separate: creation/execution of job task
MehmedGIT Apr 3, 2023
8e59e1c
add background tasks caveat
MehmedGIT Apr 3, 2023
dbf1af6
change tool/version read
MehmedGIT Apr 3, 2023
c31c5b7
combine network cli under a group
MehmedGIT Apr 3, 2023
93482d3
remove unnecessary files
MehmedGIT Apr 3, 2023
f5c34f9
clean network cli
MehmedGIT Apr 3, 2023
7bf5e78
abstract network cli in ocrd_network
MehmedGIT Apr 4, 2023
5ee1903
implement skeleton for client
MehmedGIT Apr 4, 2023
845bc09
refactor: naming
MehmedGIT Apr 4, 2023
94cec8e
fix typo
MehmedGIT Apr 4, 2023
b1ce98b
full cli path of ocrd_network
MehmedGIT Apr 5, 2023
4a5200b
full paths network cli
MehmedGIT Apr 5, 2023
0b1b551
setup.py - ocrd_network.cli
MehmedGIT Apr 5, 2023
ee61852
flexible config: either worker/server or both
MehmedGIT Apr 28, 2023
03a77bd
adapt runtime data for worker/server
MehmedGIT Apr 28, 2023
bb83ade
Add type option to bashlib-call and help output
joschrew Apr 28, 2023
845cbb6
Fix process worker/server bashlib invocation
joschrew May 2, 2023
d680e90
processor server: blocking api
MehmedGIT May 8, 2023
4b5d48e
Use async req to processor servers
MehmedGIT May 8, 2023
0520742
:todo: reference discussion on number of pages
kba Jun 21, 2023
7d7991e
Merge branch 'master' into processor-server
kba Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ocrd/ocrd/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def get_help(self, ctx):
from .log import log_cli
from .processing_server import processing_server_cli
from .processing_worker import processing_worker_cli
from .processor_server import processor_server_cli


@click.group()
Expand All @@ -53,3 +54,4 @@ def cli(**kwargs): # pylint: disable=unused-argument
cli.add_command(resmgr_cli)
cli.add_command(processing_server_cli)
cli.add_command(processing_worker_cli)
cli.add_command(processor_server_cli)
8 changes: 5 additions & 3 deletions ocrd/ocrd/cli/processing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
@click.option('-q', '--queue',
default="amqp://admin:admin@localhost:5672/",
help='The URL of the Queue Server, format: amqp://username:password@host:port/vhost',
type=QueueServerParamType())
type=QueueServerParamType(),
required=True)
@click.option('-d', '--database',
default="mongodb://localhost:27018",
help='The URL of the MongoDB, format: mongodb://host:port',
type=DatabaseParamType())
def processing_worker_cli(processor_name: str, queue: str, database: str):
type=DatabaseParamType(),
required=True)
def processing_worker_cli(processor_name: str, agent_type: str, queue: str, database: str):
"""
Start a processing worker (a specific ocr-d processor)
"""
Expand Down
47 changes: 47 additions & 0 deletions ocrd/ocrd/cli/processor_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
OCR-D CLI: start the processor server

.. click:: ocrd.cli.processor_server:processor_server_cli
:prog: ocrd processor-server
:nested: full
"""
import click
import logging
from ocrd_utils import initLogging
from ocrd_network import (
DatabaseParamType,
ProcessingServerParamType,
ProcessorServer,
)


@click.command('processor-server')
@click.argument('processor_name', required=True, type=click.STRING)
@click.option('--agent_address',
help='The URL of the processor server, format: host:port',
type=ProcessingServerParamType(),
required=True)
@click.option('-d', '--database',
default="mongodb://localhost:27018",
help='The URL of the MongoDB, format: mongodb://host:port',
type=DatabaseParamType(),
required=True)
def processor_server_cli(processor_name: str, agent_type: str, agent_address: str, database: str):
"""
Start ocr-d processor as a server
"""
initLogging()
# TODO: Remove before the release
logging.getLogger('ocrd.network').setLevel(logging.DEBUG)

try:
# TODO: Better validate that inside the ProcessorServer itself
host, port = agent_address.split(':')
processor_server = ProcessorServer(
mongodb_addr=database,
processor_name=processor_name,
processor_class=None, # For readability purposes assigned here
)
processor_server.run_server(host=host, port=int(port))
except Exception as e:
raise Exception("Processor server has failed with error") from e
222 changes: 126 additions & 96 deletions ocrd/ocrd/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
from os.path import isfile
from os import environ
import sys
from contextlib import redirect_stdout
from io import StringIO

import click

from ocrd_utils import (
is_local_filename,
Expand All @@ -15,7 +10,7 @@
from ocrd_utils import getLogger, initLogging, parse_json_string_with_comments
from ocrd_validators import WorkspaceValidator

from ocrd_network import ProcessingWorker
from ocrd_network import ProcessingWorker, ProcessorServer

from ..resolver import Resolver
from ..processor.base import run_processor
Expand All @@ -38,8 +33,12 @@ def ocrd_cli_wrap_processor(
overwrite=False,
show_resource=None,
list_resources=False,
# ocrd_network params start #
agent_type=None,
agent_address=None,
queue=None,
database=None,
# ocrd_network params end #
**kwargs
):
if not sys.argv[1:]:
Expand All @@ -56,96 +55,127 @@ def ocrd_cli_wrap_processor(
list_resources=list_resources
)
sys.exit()
# If either of these two is provided but not both
if bool(queue) != bool(database):
raise Exception("Options --queue and --database require each other.")
# If both of these are provided - start the processing worker instead of the processor - processorClass
if queue and database:
initLogging()
# TODO: Remove before the release
# We are importing the logging here because it's not the ocrd logging but python one

initLogging()

# Used for checking/starting network agents for the WebAPI architecture
# Has no side effects if neither of the 4 ocrd_network parameters are passed
check_and_run_network_agent(processorClass, agent_type, agent_address, database, queue)

LOG = getLogger('ocrd_cli_wrap_processor')
# LOG.info('kwargs=%s' % kwargs)
# Merge parameter overrides and parameters
if 'parameter_override' in kwargs:
set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override'])
# TODO OCR-D/core#274
# Assert -I / -O
# if not kwargs['input_file_grp']:
# raise ValueError('-I/--input-file-grp is required')
# if not kwargs['output_file_grp']:
# raise ValueError('-O/--output-file-grp is required')
resolver = Resolver()
working_dir, mets, _ = resolver.resolve_mets_arguments(working_dir, mets, None)
workspace = resolver.workspace_from_url(mets, working_dir)
page_id = kwargs.get('page_id')
# XXX not possible while processors do not adhere to # https://github.com/OCR-D/core/issues/505
# if overwrite
# if 'output_file_grp' not in kwargs or not kwargs['output_file_grp']:
# raise Exception("--overwrite requires --output-file-grp")
# LOG.info("Removing files because of --overwrite")
# for grp in kwargs['output_file_grp'].split(','):
# if page_id:
# for one_page_id in kwargs['page_id'].split(','):
# LOG.debug("Removing files in output file group %s with page ID %s", grp, one_page_id)
# for file in workspace.mets.find_files(pageId=one_page_id, fileGrp=grp):
# workspace.remove_file(file, force=True, keep_file=False, page_recursive=True)
# else:
# LOG.debug("Removing all files in output file group %s ", grp)
# # TODO: can be reduced to `page_same_group=True` as soon as core#505 has landed (in all processors)
# workspace.remove_file_group(grp, recursive=True, force=True, keep_files=False, page_recursive=True, page_same_group=False)
# workspace.save_mets()
# XXX While https://github.com/OCR-D/core/issues/505 is open, set 'overwrite_mode' globally on the workspace
if overwrite:
workspace.overwrite_mode = True
report = WorkspaceValidator.check_file_grp(workspace, kwargs['input_file_grp'], '' if overwrite else kwargs['output_file_grp'], page_id)
if not report.is_valid:
raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors))
# Set up profiling behavior from environment variables/flags
if not profile and 'OCRD_PROFILE' in environ:
if 'CPU' in environ['OCRD_PROFILE']:
profile = True
if not profile_file and 'OCRD_PROFILE_FILE' in environ:
profile_file = environ['OCRD_PROFILE_FILE']
if profile or profile_file:
import cProfile
import pstats
import io
import atexit
print("Profiling...")
pr = cProfile.Profile()
pr.enable()
def exit():
pr.disable()
print("Profiling completed")
if profile_file:
with open(profile_file, 'wb') as f:
pr.dump_stats(profile_file)
s = io.StringIO()
pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats()
print(s.getvalue())
atexit.register(exit)
run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs)


def check_and_run_network_agent(ProcessorClass, agent_type: str, agent_address: str, database: str, queue: str):
if not agent_type and (agent_address or database or queue):
raise ValueError("Options '--database', '--queue', and 'agent_address' are valid only with '--agent_type'")
if agent_type:
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
if not database:
raise ValueError("Options '--agent_type' and '--database' are mutually inclusive")
allowed_agent_types = ['server', 'worker']
if agent_type not in allowed_agent_types:
agents_str = ', '.join(allowed_agent_types)
raise ValueError(f"Wrong agent type parameter. Allowed agent types: {agents_str}")
if agent_type == 'server':
if not agent_address:
raise ValueError("Options '--agent_type=server' and '--agent_address' are mutually inclusive")
if queue:
raise ValueError("Options '--agent_type=server' and '--queue' are mutually exclusive")
if agent_type == 'worker':
if not queue:
raise ValueError("Options '--agent_type=worker' and '--queue' are mutually inclusive")
if agent_address:
raise ValueError("Options '--agent_type=worker' and '--agent_address' are mutually exclusive")

import logging
logging.getLogger('ocrd.network').setLevel(logging.DEBUG)

# Get the ocrd_tool dictionary
processor = processorClass(workspace=None, dump_json=True)
ocrd_tool = processor.ocrd_tool

try:
processing_worker = ProcessingWorker(
rabbitmq_addr=queue,
mongodb_addr=database,
processor_name=ocrd_tool['executable'],
ocrd_tool=ocrd_tool,
processor_class=processorClass,
)
# The RMQConsumer is initialized and a connection to the RabbitMQ is performed
processing_worker.connect_consumer()
# Start consuming from the queue with name `processor_name`
processing_worker.start_consuming()
except Exception as e:
raise Exception("Processing worker has failed with error") from e
else:
initLogging()
LOG = getLogger('ocrd_cli_wrap_processor')
# LOG.info('kwargs=%s' % kwargs)
# Merge parameter overrides and parameters
if 'parameter_override' in kwargs:
set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override'])
# TODO OCR-D/core#274
# Assert -I / -O
# if not kwargs['input_file_grp']:
# raise ValueError('-I/--input-file-grp is required')
# if not kwargs['output_file_grp']:
# raise ValueError('-O/--output-file-grp is required')
resolver = Resolver()
working_dir, mets, _ = resolver.resolve_mets_arguments(working_dir, mets, None)
workspace = resolver.workspace_from_url(mets, working_dir)
page_id = kwargs.get('page_id')
# XXX not possible while processors do not adhere to # https://github.com/OCR-D/core/issues/505
# if overwrite
# if 'output_file_grp' not in kwargs or not kwargs['output_file_grp']:
# raise Exception("--overwrite requires --output-file-grp")
# LOG.info("Removing files because of --overwrite")
# for grp in kwargs['output_file_grp'].split(','):
# if page_id:
# for one_page_id in kwargs['page_id'].split(','):
# LOG.debug("Removing files in output file group %s with page ID %s", grp, one_page_id)
# for file in workspace.mets.find_files(pageId=one_page_id, fileGrp=grp):
# workspace.remove_file(file, force=True, keep_file=False, page_recursive=True)
# else:
# LOG.debug("Removing all files in output file group %s ", grp)
# # TODO: can be reduced to `page_same_group=True` as soon as core#505 has landed (in all processors)
# workspace.remove_file_group(grp, recursive=True, force=True, keep_files=False, page_recursive=True, page_same_group=False)
# workspace.save_mets()
# XXX While https://github.com/OCR-D/core/issues/505 is open, set 'overwrite_mode' globally on the workspace
if overwrite:
workspace.overwrite_mode = True
report = WorkspaceValidator.check_file_grp(workspace, kwargs['input_file_grp'], '' if overwrite else kwargs['output_file_grp'], page_id)
if not report.is_valid:
raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors))
# Set up profiling behavior from environment variables/flags
if not profile and 'OCRD_PROFILE' in environ:
if 'CPU' in environ['OCRD_PROFILE']:
profile = True
if not profile_file and 'OCRD_PROFILE_FILE' in environ:
profile_file = environ['OCRD_PROFILE_FILE']
if profile or profile_file:
import cProfile
import pstats
import io
import atexit
print("Profiling...")
pr = cProfile.Profile()
pr.enable()
def exit():
pr.disable()
print("Profiling completed")
if profile_file:
with open(profile_file, 'wb') as f:
pr.dump_stats(profile_file)
s = io.StringIO()
pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats()
print(s.getvalue())
atexit.register(exit)
run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs)
processor = ProcessorClass(workspace=None, dump_json=True)
if agent_type == 'worker':
try:
# TODO: Passing processor_name and ocrd_tool is reduntant
processing_worker = ProcessingWorker(
rabbitmq_addr=queue,
mongodb_addr=database,
processor_name=processor.ocrd_tool['executable'],
ocrd_tool=processor.ocrd_tool,
processor_class=ProcessorClass,
)
# The RMQConsumer is initialized and a connection to the RabbitMQ is performed
processing_worker.connect_consumer()
# Start consuming from the queue with name `processor_name`
processing_worker.start_consuming()
except Exception as e:
sys.exit(f"Processing worker has failed with error: {e}")
if agent_type == 'server':
try:
# TODO: Better validate that inside the ProcessorServer itself
host, port = agent_address.split(':')
processor_server = ProcessorServer(
mongodb_addr=database,
processor_name=processor.ocrd_tool['executable'],
processor_class=ProcessorClass,
)
processor_server.run_server(host=host, port=int(port))
except Exception as e:
sys.exit(f"Processor server has failed with error: {e}")
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 3 additions & 1 deletion ocrd/ocrd/decorators/ocrd_cli_options.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import click
from click import option, Path
from .parameter_option import parameter_option, parameter_override_option
from .loglevel_option import loglevel_option
Expand Down Expand Up @@ -33,6 +34,8 @@ def cli(mets_url):
parameter_option,
parameter_override_option,
loglevel_option,
option('--agent_type', type=click.STRING),
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
option('--agent_address', type=click.STRING),
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
option('--queue', type=QueueServerParamType()),
option('--database', type=DatabaseParamType()),
option('-C', '--show-resource'),
Expand All @@ -45,4 +48,3 @@ def cli(mets_url):
for param in params:
param(f)
return f

1 change: 1 addition & 0 deletions ocrd_network/ocrd_network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# the network package. The reason, Mets Server is tightly coupled with the `OcrdWorkspace`.
from .processing_server import ProcessingServer
from .processing_worker import ProcessingWorker
from .processor_server import ProcessorServer
from .param_validators import (
DatabaseParamType,
ProcessingServerParamType,
Expand Down
Loading