Skip to content

Commit

Permalink
moved away from hdf5 to parque with pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilija Vukotic committed Jul 30, 2020
1 parent 9027048 commit 786bed5
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 17 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Byte-compiled / optimized / DLL files
secret.py
data/*.h5
data/*.pa
!data/*_processed.pa

__pycache__/
*.py[cod]
Expand Down
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.pythonPath": "C:\\Users\\ilija\\AppData\\Local\\Programs\\Python\\Python38-32"
}
Binary file added data/MWT2_processed.pa
Binary file not shown.
14 changes: 7 additions & 7 deletions data/extract_data.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# from Rucio traces extract all the paths accessed by MWT2 ANALY jobs not running via VP.
# store all the data in hdf5 file
# from Rucio traces extract all the paths accessed by MWT2 jobs not running via VP.
# store all the data in a parque file (pyarrow engine)

from time import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import pandas as pd
from secret import es_auth

pq = 'ANALY_MWT2_UCORE'
pq = 'MWT2'
# pq = 'ALL'

es = Elasticsearch(hosts=['http://atlas-kibana.mwt2.org:9200'], http_auth=es_auth)
Expand All @@ -28,12 +28,12 @@
}
}
}

print("Query:\n",query)
data = []
count = 0

# es_response = scan(es, index='rucio-traces-2020*', query=query)
es_response = scan(es, index='rucio_traces', query=query, request_timeout=60)
es_response = scan(es, index='*rucio-traces-2020*', query=query)
# es_response = scan(es, index='rucio_traces', query=query, request_timeout=60)
for item in es_response:
sou = item['_source']
doc = [
Expand All @@ -55,4 +55,4 @@
all_accesses = pd.DataFrame(data).sort_values(4)
all_accesses.columns = ['scope', 'dataset', 'filename', 'timeStart', 'filesize']
# all_accesses.set_index('filename', drop=True, inplace=True)
all_accesses.to_hdf(pq + '.h5', key='data', mode='w', complevel=1)
all_accesses.to_parquet(pq + '.pa', engine='pyarrow')
10 changes: 5 additions & 5 deletions data/process_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# reads raw data on data accesses.
# tokenizes scopes and filenames
import numpy as np
import pandas as pd


pq = 'ANALY_MWT2_UCORE'
with pd.HDFStore(pq + '.h5') as hdf:
print("keys:", hdf.keys())
data = hdf.select('data')
pq = 'MWT2'
data= pd.read_parquet(pq + '.pa')

print(data.head(10))

Expand Down Expand Up @@ -97,4 +97,4 @@ def indexTokens(toks):
all_tokens.set_index('time', inplace=True)
print(all_tokens.head(15))

all_tokens.to_hdf(pq + '_processed.h5', key='data', mode='w', complevel=1)
all_tokens.to_parquet(pq + '_processed.pa', engine='pyarrow')
Binary file removed results/100TB_LRU.h5
Binary file not shown.
Binary file removed results/20TB_LRU.h5
Binary file not shown.
Binary file removed results/InfiniteCache_LRU.h5
Binary file not shown.
8 changes: 3 additions & 5 deletions results/plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
# import numpy as np
TB = 1024 * 1024 * 1024

df = None
# name = 'InfiniteCache_LRU'
# name = '20TB_LRU'
name = '100TB_LRU'

with pd.HDFStore(name + '.h5') as hdf:
print("keys in file:", name, ':', hdf.keys())
df = hdf.select(name)
print("data loaded:", df.shape[0])

df = pd.read_parquet(name + '.pa')
print("data loaded:", df.shape[0])

print(df)
df['ch_files'] = df['cache hit'].cumsum()
Expand Down

0 comments on commit 786bed5

Please sign in to comment.