Skip to content

Commit

Permalink
moving prefix handling from finders to transformer (#553)
Browse files Browse the repository at this point in the history
* moving prefix handling from finders to transformer

Co-authored-by: Ilija Vukotic <[email protected]>
  • Loading branch information
ivukotic and Ilija Vukotic authored Mar 6, 2023
1 parent 246ed4e commit 226dd36
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def find_files(did_name: str,
if not did_name.isnumeric():
raise Exception('CERNOpenData can only work with dataset numbers as names (e.g. 1507)')

cmd = f'{command} get-file-locations --protocol xrootd --recid {did_name}'.split(' ')
cmd = f'{command} get-file-locations --recid {did_name}'.split(' ')

with Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=1,
universal_newlines=1) as p: # type: ignore
Expand Down
1 change: 0 additions & 1 deletion did_finder_rucio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ The server accepts the following arguments when it is launched
|Argument |Description |Default |
|---------------|---------------------------------------------------------------------------|----------|
|`--rabbit-uri` | A valid URI to the RabbitMQ Broker | None |
| `--prefix` | A string to prepend on resulting file names. Useful to add xCache to URLs | ' ' |

### Rucio Config

Expand Down
7 changes: 1 addition & 6 deletions did_finder_rucio/rucio_did_finder/lookup_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,17 @@ def deserialize(self, key, value, flags):
class LookupRequest:
def __init__(self, did: str,
rucio_adapter: RucioAdapter,
prefix: str = '',
request_id: str = 'bogus-id'):
'''Create the `LookupRequest` object that is responsible for returning
lists of files. Processes things in chunks.
Args:
did (str): The DID we are going to lookup
rucio_adapter (RucioAdapter): Rucio lookup object
prefix (str, optional): Prefix for xcache use. Defaults to ''.
request_id (str, optional): ServiceX Request ID that requested this DID.
Defaults to 'bogus-id'.
'''
self.did = did
self.prefix = prefix
self.rucio_adapter = rucio_adapter
self.request_id = request_id

Expand All @@ -84,7 +81,7 @@ def setCachedResults(self, result):

def lookup_files(self):
"""
lookup files, add cache prefix if needed.
lookup files.
"""
n_files = 0
ds_size = 0
Expand All @@ -110,8 +107,6 @@ def lookup_files(self):
n_files += 1
ds_size += af['file_size']
total_paths += len(af['paths'])
if self.prefix:
af['paths'] = [self.prefix+fp for fp in af['paths']]
full_file_list.append(af)
if self.mcclient:
self.setCachedResults(full_file_list)
Expand Down
2 changes: 1 addition & 1 deletion did_finder_rucio/rucio_did_finder/rucio_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def list_files_for_did(self, did):
for ds in datasets:
reps = self.replica_client.list_replicas(
[{'scope': ds[0], 'name': ds[1]}],
schemes=['root'],
schemes=['root', 'http'],
metalink=True,
sort='geoip'
)
Expand Down
3 changes: 1 addition & 2 deletions did_finder_rucio/runme.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ while true; do
sleep 5
done

if [ -z $CACHE_PREFIX ]; then export PREFIX_ARG=""; else export PREFIX_ARG="--prefix $CACHE_PREFIX"; fi
export PYTHONPATH=.

# Assume $REPORT_LOGICAL_FILES is set to --report-logical-files to activate
echo "----------->$PYTHONPATH"
ls -lht $PYTHONPATH
python3 scripts/did_finder.py --rabbit-uri $RMQ_URI $PREFIX_ARG $REPORT_LOGICAL_FILES
python3 scripts/did_finder.py --rabbit-uri $RMQ_URI $REPORT_LOGICAL_FILES

6 changes: 1 addition & 5 deletions did_finder_rucio/scripts/did_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ def run_rucio_finder():

args = parser.parse_args()

prefix = args.prefix

logger.info("ServiceX DID Finder starting up. "
f"Prefix: {prefix}")
logger.info("ServiceX DID Finder starting up. ")

if args.report_logical_files:
logger.info("---- DID Finder Only Returning Logical Names, not replicas -----")
Expand All @@ -69,7 +66,6 @@ async def callback(did_name, info):
lookup_request = LookupRequest(
did=did_name,
rucio_adapter=rucio_adapter,
prefix=prefix,
request_id=info['request-id']
)
for file in lookup_request.lookup_files():
Expand Down
5 changes: 2 additions & 3 deletions docs/deployment/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
| `didFinder.rucio.pullPolicy` | Rucio DID Finder image pull policy | `Always` |
| `didFinder.rucio.servicex_latitude` | Latitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | 41.78 |
| `didFinder.rucio.servicex_longitude` | Longitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | -87.7 |
| `didFinder.rucio.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | |
| `didFinder.rucio.reportLogicalFiles` | For CMS xCache sites, we don't want the replicas, only logical names. Set to true to get this behavior | false |
| `didFinder.rucio.rucio_host` | URL for Rucio service to use | `https://voatlasrucio-server-prod.cern.ch:443` |
| `didFinder.rucio.auth _host` | URL to obtain Rucio authentication | `https://voatlasrucio-auth-prod.cern.ch:443` |
Expand All @@ -62,7 +61,6 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
| `didFinder.CERNOpenData.image` | CERN OpenData DID Finder image name | `sslhep/servicex-did-finder` |
| `didFinder.CERNOpenData.tag` | CERN OpenData DID Finder image tag | `latest` |
| `didFinder.CERNOpenData.pullPolicy` | CERN OpenData DID Finder image pull policy | `Always` |
| `didFinder.rucio.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | |
| `codeGen.enabled` | Enable deployment of code generator service? | `true` |
| `codeGen.image` | Code Gen image name | `sslhep/servicex_code_gen_funcadl_xaod` |
| `codeGen.tag` | Code Gen image tag | `latest` |
Expand All @@ -85,6 +83,7 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
| `minio.auth.rootPassword` | Password key to log into minio | leftfoot1 |
| `minio.apiIngress.enabled` | Should minio chart deploy an ingress to the service? | false |
| `minio.apiIngress.hostname` | Hostname associate with ingress controller | nil |
| `transformer.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | |
| `transformer.autoscaler.enabled` | Enable/disable horizontal pod autoscaler for transformers | True |
| `transformer.autoscaler.cpuScaleThreshold` | CPU percentage threshold for pod scaling | 30 |
| `transformer.autoscaler.minReplicas` | Minimum number of transformer pods per request | 1 |
Expand All @@ -103,7 +102,7 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
| `minioCleanup.pullPolicy` | minioCleanup image pull policy | `Always` |
| `minioCleanup.threads` | Number of threads to use when processing S3 Storage | 6 |
| `minioCleanup.logLevel` | Log level to use for logging (e.g. DEBUG, INFO, WARN, ERROR, FATAL) | INFO |
| `minioCleanup.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | '* */8 * * *' (every 8 hours) |
| `minioCleanup.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) |
| `minioCleanup.maxAge` | Max age in days before removing results | 30 |
| `minioCleanup.maxSize` | Start removing buckets when total space used reaches this number (can use G,M, T suffixes) | '1G' |
| `minioCleanup.normSize` | Size at which to stop removing buckets | '700M' | |
1 change: 1 addition & 0 deletions helm/servicex/templates/app/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ data:
TRANSFORMER_MANAGER_ENABLED = True
TRANSFORMER_CACHE_PREFIX = {{ .Values.transformer.cachePrefix }}
TRANSFORMER_AUTOSCALE_ENABLED = {{- ternary "True" "False" .Values.transformer.autoscaler.enabled }}
TRANSFORMER_CPU_LIMIT = {{ .Values.transformer.cpuLimit }}
TRANSFORMER_CPU_SCALE_THRESHOLD = {{ .Values.transformer.autoscaler.cpuScaleThreshold }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ spec:
env:
- name: INSTANCE_NAME
value: {{ .Release.Name }}
{{- if .Values.didFinder.CERNOpenData.cachePrefix }}
- name: CACHE_PREFIX
value: "{{ .Values.didFinder.CERNOpenData.cachePrefix }}"
{{- end }}
{{- if .Values.secrets }}
- name: RMQ_PASS
valueFrom:
Expand Down
4 changes: 0 additions & 4 deletions helm/servicex/templates/did-finder-rucio/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ spec:
- name: RMQ_URI
value: amqp://user:{{ .Values.rabbitmq.auth.password }}@{{ .Release.Name }}-rabbitmq:5672/?heartbeat=9000
{{- end }}
{{- if .Values.didFinder.rucio.cachePrefix }}
- name: CACHE_PREFIX
value: "{{ .Values.didFinder.rucio.cachePrefix }}"
{{- end }}
{{- if .Values.didFinder.rucio.reportLogicalFiles }}
- name: REPORT_LOGICAL_FILES
value: "--report-logical-files"
Expand Down
6 changes: 3 additions & 3 deletions helm/servicex/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ codeGen:
defaultScienceContainerTag: develop
didFinder:
CERNOpenData:
cachePrefix: null
enabled: true
image: sslhep/servicex-did-finder-cernopendata
pullPolicy: Always
tag: develop
rucio:
auth_host: https://voatlasrucio-auth-prod.cern.ch:443
cachePrefix: null
enabled: true
image: sslhep/servicex-did-finder
memcache:
Expand Down Expand Up @@ -109,6 +107,8 @@ rabbitmq:
rbacEnabled: true
secrets: null
transformer:
cachePrefix: null

autoscaler:
cpuScaleThreshold: 30
enabled: true
Expand All @@ -122,7 +122,7 @@ transformer:
scienceContainerPullPolicy: Always

language: python
exec: # replace me
exec: # replace me
outputDir: /servicex/output

persistence:
Expand Down
30 changes: 12 additions & 18 deletions servicex_app/servicex/transformer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,17 @@ def create_job_object(request_id, image, rabbitmq_uri, workers,
field_path="metadata.name"))
env += [client.V1EnvVar("POD_NAME", value_from=pod_name_value_from)]

# provide pods with level and logging server info
env += [
client.V1EnvVar("LOG_LEVEL", value=os.environ.get('LOG_LEVEL', 'INFO').upper()),
client.V1EnvVar("LOGSTASH_HOST", value=os.environ.get('LOGSTASH_HOST')),
client.V1EnvVar("LOGSTASH_PORT", value=os.environ.get('LOGSTASH_PORT'))
]

# Provide each pod with an environment var holding that instance name
if "INSTANCE_NAME" in current_app.config:
instance_name = current_app.config['INSTANCE_NAME']
env_var_instance_name = client.V1EnvVar("INSTANCE_NAME",
value=instance_name)
env = env + [env_var_instance_name]

# provide each pod with an environment var holding cache prefix path
if "TRANSFORMER_CACHE_PREFIX" in current_app.config:
env += [client.V1EnvVar("CACHE_PREFIX", value=os.environ.get('CACHE_PREFIX'))]

if result_destination == 'object-store':
env = env + [
client.V1EnvVar(name='MINIO_URL',
Expand All @@ -143,21 +140,18 @@ def create_job_object(request_id, image, rabbitmq_uri, workers,
if result_destination == 'volume':
TransformerManager.create_posix_volume(volumes, volume_mounts)

science_command = " "
if x509_secret:
sidecar_command = " "
science_command = "until [ -f /servicex/output/scripts/proxy-exporter.sh ];" \
"do sleep 5;done &&" \
" /servicex/output/scripts/proxy-exporter.sh & sleep 5 && "
else:
sidecar_command = " "
science_command = " "

sidecar_command += "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + \
"python /servicex/transformer_sidecar/transformer.py " + \
" --request-id " + request_id + \
" --rabbit-uri " + rabbitmq_uri + \
" --result-destination " + result_destination + \
" --result-format " + result_format

sidecar_command = "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + \
"python /servicex/transformer_sidecar/transformer.py " + \
" --request-id " + request_id + \
" --rabbit-uri " + rabbitmq_uri + \
" --result-destination " + result_destination + \
" --result-format " + result_format

watch_path = os.path.join(current_app.config['TRANSFORMER_SIDECAR_VOLUME_PATH'],
request_id)
Expand Down
15 changes: 15 additions & 0 deletions transformer_sidecar/src/transformer_sidecar/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,26 @@ def callback(channel, method, properties, body):
_request_id = transform_request['request-id']

# The transform can either include a single path, or a list of replicas

if 'file-path' in transform_request:
_file_paths = [transform_request['file-path']]

# make sure that paths starting with http are at the end of the list
_https = []
_roots = []
for _fp in _file_paths:
if _fp.startswith('http'):
_https.append(_fp)
else:
_roots.append(_fp)
_file_paths = _roots+_https
else:
_file_paths = transform_request['paths'].split(',')

# adding cache prefix
prefix = os.environ.get('CACHE_PREFIX', '')
_file_paths = [prefix+fp for fp in _file_paths]

_file_id = transform_request['file-id']
_server_endpoint = transform_request['service-endpoint']
logger.info("got transform request.", extra=transform_request)
Expand Down

0 comments on commit 226dd36

Please sign in to comment.