diff --git a/.gitignore b/.gitignore index 5c5199b..8884186 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Byte-compiled / optimized / DLL files secret.py data/*.h5 +data/*.pa +!data/*_processed.pa __pycache__/ *.py[cod] diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..9df4e90 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "python.linting.enabled": true, + "python.linting.pylintEnabled": true, + "python.pythonPath": "C:\\Users\\ilija\\AppData\\Local\\Programs\\Python\\Python38-32" +} \ No newline at end of file diff --git a/data/MWT2_processed.pa b/data/MWT2_processed.pa new file mode 100644 index 0000000..3561530 Binary files /dev/null and b/data/MWT2_processed.pa differ diff --git a/data/extract_data.py b/data/extract_data.py index 04516a7..741f312 100644 --- a/data/extract_data.py +++ b/data/extract_data.py @@ -1,5 +1,5 @@ -# from Rucio traces extract all the paths accessed by MWT2 ANALY jobs not running via VP. -# store all the data in hdf5 file +# from Rucio traces extract all the paths accessed by MWT2 jobs not running via VP. +# store all the data in a parque file (pyarrow engine) from time import time from elasticsearch import Elasticsearch @@ -7,7 +7,7 @@ import pandas as pd from secret import es_auth -pq = 'ANALY_MWT2_UCORE' +pq = 'MWT2' # pq = 'ALL' es = Elasticsearch(hosts=['http://atlas-kibana.mwt2.org:9200'], http_auth=es_auth) @@ -28,12 +28,12 @@ } } } - +print("Query:\n",query) data = [] count = 0 -# es_response = scan(es, index='rucio-traces-2020*', query=query) -es_response = scan(es, index='rucio_traces', query=query, request_timeout=60) +es_response = scan(es, index='*rucio-traces-2020*', query=query) +# es_response = scan(es, index='rucio_traces', query=query, request_timeout=60) for item in es_response: sou = item['_source'] doc = [ @@ -55,4 +55,4 @@ all_accesses = pd.DataFrame(data).sort_values(4) all_accesses.columns = ['scope', 'dataset', 'filename', 'timeStart', 'filesize'] # all_accesses.set_index('filename', drop=True, inplace=True) -all_accesses.to_hdf(pq + '.h5', key='data', mode='w', complevel=1) +all_accesses.to_parquet(pq + '.pa', engine='pyarrow') diff --git a/data/process_data.py b/data/process_data.py index d3a5202..bd7e85e 100644 --- a/data/process_data.py +++ b/data/process_data.py @@ -1,11 +1,11 @@ +# reads raw data on data accesses. +# tokenizes scopes and filenames import numpy as np import pandas as pd -pq = 'ANALY_MWT2_UCORE' -with pd.HDFStore(pq + '.h5') as hdf: - print("keys:", hdf.keys()) - data = hdf.select('data') +pq = 'MWT2' +data= pd.read_parquet(pq + '.pa') print(data.head(10)) @@ -97,4 +97,4 @@ def indexTokens(toks): all_tokens.set_index('time', inplace=True) print(all_tokens.head(15)) -all_tokens.to_hdf(pq + '_processed.h5', key='data', mode='w', complevel=1) +all_tokens.to_parquet(pq + '_processed.pa', engine='pyarrow') diff --git a/results/100TB_LRU.h5 b/results/100TB_LRU.h5 deleted file mode 100644 index b03e5f7..0000000 Binary files a/results/100TB_LRU.h5 and /dev/null differ diff --git a/results/20TB_LRU.h5 b/results/20TB_LRU.h5 deleted file mode 100644 index ba97726..0000000 Binary files a/results/20TB_LRU.h5 and /dev/null differ diff --git a/results/InfiniteCache_LRU.h5 b/results/InfiniteCache_LRU.h5 deleted file mode 100644 index fd780a9..0000000 Binary files a/results/InfiniteCache_LRU.h5 and /dev/null differ diff --git a/results/plots.py b/results/plots.py index 239cb1b..c381ace 100644 --- a/results/plots.py +++ b/results/plots.py @@ -4,15 +4,13 @@ # import numpy as np TB = 1024 * 1024 * 1024 -df = None # name = 'InfiniteCache_LRU' # name = '20TB_LRU' name = '100TB_LRU' -with pd.HDFStore(name + '.h5') as hdf: - print("keys in file:", name, ':', hdf.keys()) - df = hdf.select(name) - print("data loaded:", df.shape[0]) + +df = pd.read_parquet(name + '.pa') +print("data loaded:", df.shape[0]) print(df) df['ch_files'] = df['cache hit'].cumsum()