From 226dd368f122229e9d0ac068175b87ccd4797ed0 Mon Sep 17 00:00:00 2001 From: Ilija Vukotic Date: Mon, 6 Mar 2023 15:56:46 -0600 Subject: [PATCH] moving prefix handling from finders to transformer (#553) * moving prefix handling from finders to transformer Co-authored-by: Ilija Vukotic --- .../did_finder.py | 2 +- did_finder_rucio/README.md | 1 - .../rucio_did_finder/lookup_request.py | 7 +---- .../rucio_did_finder/rucio_adapter.py | 2 +- did_finder_rucio/runme.sh | 3 +- did_finder_rucio/scripts/did_finder.py | 6 +--- docs/deployment/reference.md | 5 ++-- helm/servicex/templates/app/configmap.yaml | 1 + .../did-finder-cernopendata/deployment.yaml | 4 --- .../did-finder-rucio/deployment.yaml | 4 --- helm/servicex/values.yaml | 6 ++-- servicex_app/servicex/transformer_manager.py | 30 ++++++++----------- .../src/transformer_sidecar/transformer.py | 15 ++++++++++ 13 files changed, 38 insertions(+), 48 deletions(-) diff --git a/did_finder_cernopendata/src/servicex_did_finder_cernopendata/did_finder.py b/did_finder_cernopendata/src/servicex_did_finder_cernopendata/did_finder.py index 5fcd602c6..00bddccfc 100644 --- a/did_finder_cernopendata/src/servicex_did_finder_cernopendata/did_finder.py +++ b/did_finder_cernopendata/src/servicex_did_finder_cernopendata/did_finder.py @@ -34,7 +34,7 @@ async def find_files(did_name: str, if not did_name.isnumeric(): raise Exception('CERNOpenData can only work with dataset numbers as names (e.g. 1507)') - cmd = f'{command} get-file-locations --protocol xrootd --recid {did_name}'.split(' ') + cmd = f'{command} get-file-locations --recid {did_name}'.split(' ') with Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=1, universal_newlines=1) as p: # type: ignore diff --git a/did_finder_rucio/README.md b/did_finder_rucio/README.md index 5cf7aa54c..8923de4d9 100644 --- a/did_finder_rucio/README.md +++ b/did_finder_rucio/README.md @@ -41,7 +41,6 @@ The server accepts the following arguments when it is launched |Argument |Description |Default | |---------------|---------------------------------------------------------------------------|----------| |`--rabbit-uri` | A valid URI to the RabbitMQ Broker | None | -| `--prefix` | A string to prepend on resulting file names. Useful to add xCache to URLs | ' ' | ### Rucio Config diff --git a/did_finder_rucio/rucio_did_finder/lookup_request.py b/did_finder_rucio/rucio_did_finder/lookup_request.py index 88a092adb..385bd0d4b 100644 --- a/did_finder_rucio/rucio_did_finder/lookup_request.py +++ b/did_finder_rucio/rucio_did_finder/lookup_request.py @@ -51,7 +51,6 @@ def deserialize(self, key, value, flags): class LookupRequest: def __init__(self, did: str, rucio_adapter: RucioAdapter, - prefix: str = '', request_id: str = 'bogus-id'): '''Create the `LookupRequest` object that is responsible for returning lists of files. Processes things in chunks. @@ -59,12 +58,10 @@ def __init__(self, did: str, Args: did (str): The DID we are going to lookup rucio_adapter (RucioAdapter): Rucio lookup object - prefix (str, optional): Prefix for xcache use. Defaults to ''. request_id (str, optional): ServiceX Request ID that requested this DID. Defaults to 'bogus-id'. ''' self.did = did - self.prefix = prefix self.rucio_adapter = rucio_adapter self.request_id = request_id @@ -84,7 +81,7 @@ def setCachedResults(self, result): def lookup_files(self): """ - lookup files, add cache prefix if needed. + lookup files. """ n_files = 0 ds_size = 0 @@ -110,8 +107,6 @@ def lookup_files(self): n_files += 1 ds_size += af['file_size'] total_paths += len(af['paths']) - if self.prefix: - af['paths'] = [self.prefix+fp for fp in af['paths']] full_file_list.append(af) if self.mcclient: self.setCachedResults(full_file_list) diff --git a/did_finder_rucio/rucio_did_finder/rucio_adapter.py b/did_finder_rucio/rucio_did_finder/rucio_adapter.py index 84b28cc53..4f7371745 100644 --- a/did_finder_rucio/rucio_did_finder/rucio_adapter.py +++ b/did_finder_rucio/rucio_did_finder/rucio_adapter.py @@ -124,7 +124,7 @@ def list_files_for_did(self, did): for ds in datasets: reps = self.replica_client.list_replicas( [{'scope': ds[0], 'name': ds[1]}], - schemes=['root'], + schemes=['root', 'http'], metalink=True, sort='geoip' ) diff --git a/did_finder_rucio/runme.sh b/did_finder_rucio/runme.sh index f3e7eb581..20bb60ecd 100755 --- a/did_finder_rucio/runme.sh +++ b/did_finder_rucio/runme.sh @@ -13,11 +13,10 @@ while true; do sleep 5 done -if [ -z $CACHE_PREFIX ]; then export PREFIX_ARG=""; else export PREFIX_ARG="--prefix $CACHE_PREFIX"; fi export PYTHONPATH=. # Assume $REPORT_LOGICAL_FILES is set to --report-logical-files to activate echo "----------->$PYTHONPATH" ls -lht $PYTHONPATH -python3 scripts/did_finder.py --rabbit-uri $RMQ_URI $PREFIX_ARG $REPORT_LOGICAL_FILES +python3 scripts/did_finder.py --rabbit-uri $RMQ_URI $REPORT_LOGICAL_FILES diff --git a/did_finder_rucio/scripts/did_finder.py b/did_finder_rucio/scripts/did_finder.py index d12eb577d..f6616d6d5 100644 --- a/did_finder_rucio/scripts/did_finder.py +++ b/did_finder_rucio/scripts/did_finder.py @@ -48,10 +48,7 @@ def run_rucio_finder(): args = parser.parse_args() - prefix = args.prefix - - logger.info("ServiceX DID Finder starting up. " - f"Prefix: {prefix}") + logger.info("ServiceX DID Finder starting up. ") if args.report_logical_files: logger.info("---- DID Finder Only Returning Logical Names, not replicas -----") @@ -69,7 +66,6 @@ async def callback(did_name, info): lookup_request = LookupRequest( did=did_name, rucio_adapter=rucio_adapter, - prefix=prefix, request_id=info['request-id'] ) for file in lookup_request.lookup_files(): diff --git a/docs/deployment/reference.md b/docs/deployment/reference.md index a65da9e16..5e29acd1f 100644 --- a/docs/deployment/reference.md +++ b/docs/deployment/reference.md @@ -50,7 +50,6 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn | `didFinder.rucio.pullPolicy` | Rucio DID Finder image pull policy | `Always` | | `didFinder.rucio.servicex_latitude` | Latitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | 41.78 | | `didFinder.rucio.servicex_longitude` | Longitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | -87.7 | -| `didFinder.rucio.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | | | `didFinder.rucio.reportLogicalFiles` | For CMS xCache sites, we don't want the replicas, only logical names. Set to true to get this behavior | false | | `didFinder.rucio.rucio_host` | URL for Rucio service to use | `https://voatlasrucio-server-prod.cern.ch:443` | | `didFinder.rucio.auth _host` | URL to obtain Rucio authentication | `https://voatlasrucio-auth-prod.cern.ch:443` | @@ -62,7 +61,6 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn | `didFinder.CERNOpenData.image` | CERN OpenData DID Finder image name | `sslhep/servicex-did-finder` | | `didFinder.CERNOpenData.tag` | CERN OpenData DID Finder image tag | `latest` | | `didFinder.CERNOpenData.pullPolicy` | CERN OpenData DID Finder image pull policy | `Always` | -| `didFinder.rucio.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | | | `codeGen.enabled` | Enable deployment of code generator service? | `true` | | `codeGen.image` | Code Gen image name | `sslhep/servicex_code_gen_funcadl_xaod` | | `codeGen.tag` | Code Gen image tag | `latest` | @@ -85,6 +83,7 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn | `minio.auth.rootPassword` | Password key to log into minio | leftfoot1 | | `minio.apiIngress.enabled` | Should minio chart deploy an ingress to the service? | false | | `minio.apiIngress.hostname` | Hostname associate with ingress controller | nil | +| `transformer.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | | | `transformer.autoscaler.enabled` | Enable/disable horizontal pod autoscaler for transformers | True | | `transformer.autoscaler.cpuScaleThreshold` | CPU percentage threshold for pod scaling | 30 | | `transformer.autoscaler.minReplicas` | Minimum number of transformer pods per request | 1 | @@ -103,7 +102,7 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn | `minioCleanup.pullPolicy` | minioCleanup image pull policy | `Always` | | `minioCleanup.threads` | Number of threads to use when processing S3 Storage | 6 | | `minioCleanup.logLevel` | Log level to use for logging (e.g. DEBUG, INFO, WARN, ERROR, FATAL) | INFO | -| `minioCleanup.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | '* */8 * * *' (every 8 hours) | +| `minioCleanup.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) | | `minioCleanup.maxAge` | Max age in days before removing results | 30 | | `minioCleanup.maxSize` | Start removing buckets when total space used reaches this number (can use G,M, T suffixes) | '1G' | | `minioCleanup.normSize` | Size at which to stop removing buckets | '700M' | | diff --git a/helm/servicex/templates/app/configmap.yaml b/helm/servicex/templates/app/configmap.yaml index b4c649926..184aabadc 100644 --- a/helm/servicex/templates/app/configmap.yaml +++ b/helm/servicex/templates/app/configmap.yaml @@ -81,6 +81,7 @@ data: TRANSFORMER_MANAGER_ENABLED = True + TRANSFORMER_CACHE_PREFIX = {{ .Values.transformer.cachePrefix }} TRANSFORMER_AUTOSCALE_ENABLED = {{- ternary "True" "False" .Values.transformer.autoscaler.enabled }} TRANSFORMER_CPU_LIMIT = {{ .Values.transformer.cpuLimit }} TRANSFORMER_CPU_SCALE_THRESHOLD = {{ .Values.transformer.autoscaler.cpuScaleThreshold }} diff --git a/helm/servicex/templates/did-finder-cernopendata/deployment.yaml b/helm/servicex/templates/did-finder-cernopendata/deployment.yaml index 848acbe93..db3fe209b 100644 --- a/helm/servicex/templates/did-finder-cernopendata/deployment.yaml +++ b/helm/servicex/templates/did-finder-cernopendata/deployment.yaml @@ -27,10 +27,6 @@ spec: env: - name: INSTANCE_NAME value: {{ .Release.Name }} - {{- if .Values.didFinder.CERNOpenData.cachePrefix }} - - name: CACHE_PREFIX - value: "{{ .Values.didFinder.CERNOpenData.cachePrefix }}" - {{- end }} {{- if .Values.secrets }} - name: RMQ_PASS valueFrom: diff --git a/helm/servicex/templates/did-finder-rucio/deployment.yaml b/helm/servicex/templates/did-finder-rucio/deployment.yaml index 3a18d1bb7..cb2f30b3f 100644 --- a/helm/servicex/templates/did-finder-rucio/deployment.yaml +++ b/helm/servicex/templates/did-finder-rucio/deployment.yaml @@ -35,10 +35,6 @@ spec: - name: RMQ_URI value: amqp://user:{{ .Values.rabbitmq.auth.password }}@{{ .Release.Name }}-rabbitmq:5672/?heartbeat=9000 {{- end }} - {{- if .Values.didFinder.rucio.cachePrefix }} - - name: CACHE_PREFIX - value: "{{ .Values.didFinder.rucio.cachePrefix }}" - {{- end }} {{- if .Values.didFinder.rucio.reportLogicalFiles }} - name: REPORT_LOGICAL_FILES value: "--report-logical-files" diff --git a/helm/servicex/values.yaml b/helm/servicex/values.yaml index 952f9339b..9ea45b35b 100644 --- a/helm/servicex/values.yaml +++ b/helm/servicex/values.yaml @@ -37,14 +37,12 @@ codeGen: defaultScienceContainerTag: develop didFinder: CERNOpenData: - cachePrefix: null enabled: true image: sslhep/servicex-did-finder-cernopendata pullPolicy: Always tag: develop rucio: auth_host: https://voatlasrucio-auth-prod.cern.ch:443 - cachePrefix: null enabled: true image: sslhep/servicex-did-finder memcache: @@ -109,6 +107,8 @@ rabbitmq: rbacEnabled: true secrets: null transformer: + cachePrefix: null + autoscaler: cpuScaleThreshold: 30 enabled: true @@ -122,7 +122,7 @@ transformer: scienceContainerPullPolicy: Always language: python - exec: # replace me + exec: # replace me outputDir: /servicex/output persistence: diff --git a/servicex_app/servicex/transformer_manager.py b/servicex_app/servicex/transformer_manager.py index 25577b7ef..f6033f223 100644 --- a/servicex_app/servicex/transformer_manager.py +++ b/servicex_app/servicex/transformer_manager.py @@ -113,13 +113,6 @@ def create_job_object(request_id, image, rabbitmq_uri, workers, field_path="metadata.name")) env += [client.V1EnvVar("POD_NAME", value_from=pod_name_value_from)] - # provide pods with level and logging server info - env += [ - client.V1EnvVar("LOG_LEVEL", value=os.environ.get('LOG_LEVEL', 'INFO').upper()), - client.V1EnvVar("LOGSTASH_HOST", value=os.environ.get('LOGSTASH_HOST')), - client.V1EnvVar("LOGSTASH_PORT", value=os.environ.get('LOGSTASH_PORT')) - ] - # Provide each pod with an environment var holding that instance name if "INSTANCE_NAME" in current_app.config: instance_name = current_app.config['INSTANCE_NAME'] @@ -127,6 +120,10 @@ def create_job_object(request_id, image, rabbitmq_uri, workers, value=instance_name) env = env + [env_var_instance_name] + # provide each pod with an environment var holding cache prefix path + if "TRANSFORMER_CACHE_PREFIX" in current_app.config: + env += [client.V1EnvVar("CACHE_PREFIX", value=os.environ.get('CACHE_PREFIX'))] + if result_destination == 'object-store': env = env + [ client.V1EnvVar(name='MINIO_URL', @@ -143,21 +140,18 @@ def create_job_object(request_id, image, rabbitmq_uri, workers, if result_destination == 'volume': TransformerManager.create_posix_volume(volumes, volume_mounts) + science_command = " " if x509_secret: - sidecar_command = " " science_command = "until [ -f /servicex/output/scripts/proxy-exporter.sh ];" \ "do sleep 5;done &&" \ " /servicex/output/scripts/proxy-exporter.sh & sleep 5 && " - else: - sidecar_command = " " - science_command = " " - - sidecar_command += "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + \ - "python /servicex/transformer_sidecar/transformer.py " + \ - " --request-id " + request_id + \ - " --rabbit-uri " + rabbitmq_uri + \ - " --result-destination " + result_destination + \ - " --result-format " + result_format + + sidecar_command = "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + \ + "python /servicex/transformer_sidecar/transformer.py " + \ + " --request-id " + request_id + \ + " --rabbit-uri " + rabbitmq_uri + \ + " --result-destination " + result_destination + \ + " --result-format " + result_format watch_path = os.path.join(current_app.config['TRANSFORMER_SIDECAR_VOLUME_PATH'], request_id) diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index 6161e7e54..7678b98a0 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -146,11 +146,26 @@ def callback(channel, method, properties, body): _request_id = transform_request['request-id'] # The transform can either include a single path, or a list of replicas + if 'file-path' in transform_request: _file_paths = [transform_request['file-path']] + + # make sure that paths starting with http are at the end of the list + _https = [] + _roots = [] + for _fp in _file_paths: + if _fp.startswith('http'): + _https.append(_fp) + else: + _roots.append(_fp) + _file_paths = _roots+_https else: _file_paths = transform_request['paths'].split(',') + # adding cache prefix + prefix = os.environ.get('CACHE_PREFIX', '') + _file_paths = [prefix+fp for fp in _file_paths] + _file_id = transform_request['file-id'] _server_endpoint = transform_request['service-endpoint'] logger.info("got transform request.", extra=transform_request)