Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make DIDFinderApp a subclass of Celery App
Browse files Browse the repository at this point in the history
BenGalewsky committed Jul 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent c1e3372 commit d9553c7
Showing 3 changed files with 51 additions and 109 deletions.
66 changes: 30 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -28,15 +28,32 @@ by adding the following to your `pyproject.tom` file:
servicex-did-finder-lib = "^3.0"
```

Create a script that will run your DID. It needs to contain your generator function that adheres
to the UserDIDHandler signature:
Create a celery app that will run your DID finder. This app will be responsible for starting the
Celery worker and registering your DID finder function as a task. Here is an example of how to do
this. Celery prefers that the app is in a file called `celery.py` in a module in your project. Here
is an example of how to do this:

## celery.py:
```python
UserDIDHandler = Callable[
[str, Dict[str, Any], Dict[str, Any]],
Generator[Dict[str, Any], None, None]
]

from servicex_did_finder_lib import DIDFinderApp
rucio_adaptor = RucioAdaptor()
app = DIDFinderApp('rucio', did_finder_args={"rucio_adapter": rucio_adaptor})
```

Attach the DID finder to the app by using the `did_lookup_task` decorator. This decorator will
register the function as a Celery task. Here is an example of how to do this:

```python
@app.did_lookup_task(name="did_finder_rucio.lookup_dataset")
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
self.do_lookup(did=did, dataset_id=dataset_id,
endpoint=endpoint, user_did_finder=find_files)
```

You will need to implement the `find_files` function. This function is a generator that yields
file information dictionaries.

The arguments to the method are straight forward:

* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
@@ -74,45 +91,22 @@ def find_files(did_name: str,
}
```

There is a small amount of additional boilerplate code that is required to create a DID Finder. This
is the code that will create the Celery app and register your function as a task. Here is an
example (which assumes that `find_files` is your DID handler):
```python
from servicex_did_finder_lib import DIDFinderApp

app = DIDFinderApp('cernopendata')

@app.did_lookup_task(name="did_finder_cern_opendata.lookup_dataset")
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
self.do_lookup(did=did, dataset_id=dataset_id,
endpoint=endpoint, user_did_finder=find_files)

app.start()
```

## Extra Command Line Arguments
Sometimes you need to pass additional information to your DID Finder from the command line. You do
this by creating your own `ArgParser` and then calling the `add_did_finder_cnd_arguments` method
which inserts the arguments that the library needs to pass to the finder. Here is an example:

this by creating your own `ArgParser`
```python
import argparse
from servicex_did_finder_lib import DIDFinderApp

parser = argparse.ArgumentParser()
parser.add_argument('--foo', dest='foo', action='store',
default='',
help='Prefix to add to Xrootd URLs')
# Parse command-line arguments
parser = argparse.ArgumentParser(description='DIDFinderApp')
parser.add_argument('--custom-arg', help='Custom argument for DIDFinderApp')
args, unknown = parser.parse_known_args()

DIDFinderApp.add_did_finder_cnd_arguments(parser)
# Create the app instance
app = DIDFinderApp('myApp', did_finder_args={"custom-arg": args.custom_arg})

```

You then just pass the dictionary of parsed args to your app constructor:
```python
app = DIDFinderApp('cernopendata', parsed_args=parser.parse_args())
```

These parsed args will be passed to your `find_files` function as a dictionary in
the `did_finder_args` parameter.

64 changes: 12 additions & 52 deletions src/servicex_did_finder_lib/did_finder_app.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import logging
from datetime import datetime
from typing import Any, Generator, Callable, Dict, Optional
@@ -120,37 +119,31 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U
)


class DIDFinderApp:
class DIDFinderApp(Celery):
"""
The main application for a DID finder. This will setup the Celery application
and start the worker to process the DID requests.
"""
def __init__(self, did_finder_name: str,
parsed_args: Optional[argparse.Namespace] = None):
did_finder_args: Optional[Dict[str, Any]] = None,
*args, **kwargs):
"""
Initialize the DID finder application
Args:
did_finder_name: The name of the DID finder
parsed_args: The parsed command line arguments. Leave as None to use the default parser
did_finder_name: The name of the DID finder.
did_finder_args: The parsed command line arguments and other objects you want
to make available to the tasks
"""

self.name = did_finder_name
self.parsed_args = vars(parsed_args) if parsed_args else None

# Setup command line parsing
if self.parsed_args is None:
parser = argparse.ArgumentParser()
self.add_did_finder_cnd_arguments(parser)
self.parsed_args = vars(parser.parse_args())

initialize_root_logger(self.name)

self.app = Celery(f"did_finder_{self.name}",
broker_url=self.parsed_args['rabbit_uri'],
broker_connection_retry_on_startup=True)
super().__init__(f"did_finder_{self.name}", *args,
broker_connection_retry_on_startup=True,
**kwargs)

# Cache the args in the App so they are accessible to the tasks
self.app.did_finder_args = self.parsed_args
# Cache the args in the App, so they are accessible to the tasks
self.did_finder_args = did_finder_args

def did_lookup_task(self, name):
"""
@@ -166,41 +159,8 @@ def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
name: The name of the task
"""
def decorator(func):
@self.app.task(base=DIDFinderTask, bind=True, name=name)
@self.task(base=DIDFinderTask, bind=True, name=name)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator

def start(self):
self.app.worker_main(argv=['worker',
'--loglevel=INFO',
'-Q', f'did_finder_{self.name}',
'-n', f'{self.name}@%h'
])

@classmethod
def add_did_finder_cnd_arguments(cls, parser: argparse.ArgumentParser):
"""add_did_finder_cnd_arguments Add required arguments to a parser
If you need to parse command line arguments for some special configuration, create your
own argument parser, and call this function to make sure the arguments needed
for running the back-end communication are filled in properly.
Then pass the results of the parsing to the DID Finder App's constructor method.
Args:
parser (argparse.ArgumentParser): The argument parser. Arguments needed for the
did finder/servicex communication will be added.
"""
parser.add_argument(
"--rabbit-uri", dest="rabbit_uri", action="store", required=True
)
parser.add_argument(
"--prefix",
dest="prefix",
action="store",
required=False,
default="",
help="Prefix to add to use a caching proxy for URIs",
)
30 changes: 9 additions & 21 deletions tests/servicex_did_finder_lib_tests/test_did_finder_app.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import sys
from unittest.mock import patch
import pytest
from celery import Celery
@@ -104,25 +103,14 @@ def test_did_finder_task_exception(mocker, servicex, single_file_info):
)


def test_did_finder_app(mocker, monkeypatch):
# Temporarily replace sys.argv with mock_args
monkeypatch.setattr(sys, 'argv', [
"did_finder.py",
"--rabbit-uri", "my-rabbit"
])
def test_celery_app():
app = DIDFinderApp('foo')
assert isinstance(app, Celery)
assert app.name == 'foo'

mock_celery_app = mocker.MagicMock(Celery)
@app.did_lookup_task(name="did_finder_rucio.lookup_dataset")
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
self.do_lookup(did=did, dataset_id=dataset_id,
endpoint=endpoint, user_did_finder=lambda x, y, z: None)

with patch(
"servicex_did_finder_lib.did_finder_app.Celery", autospec=True
) as celery:
celery.return_value = mock_celery_app
app = DIDFinderApp(did_finder_name="pytest", parsed_args=None)
app.start()
celery.assert_called_with("did_finder_pytest",
broker_connection_retry_on_startup=True,
broker_url="my-rabbit")
mock_celery_app.worker_main.assert_called_with(argv=['worker',
'--loglevel=INFO',
'-Q', 'did_finder_pytest',
'-n', 'pytest@%h'])
assert lookup_dataset.__name__ == 'wrapper'

0 comments on commit d9553c7

Please sign in to comment.