Skip to content

Commit

Permalink
cleanup. caching cleanup based on last action.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilija Vukotic committed Jul 31, 2020
1 parent 7bd35df commit 05806f5
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 25 deletions.
22 changes: 20 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,33 @@ observation space has following variables:
There are two discrete action environments (*Cache-v0* and *Cache-large-v0*) and one discrete action environment (*Cache-continuous-v0*).



## Data extractions and preprocessing
This is a two step procedure:
* *extract raw data* _data/extract_data.py_ - change PQ, date range
* *process raw data* _data/process_data.py_ - tokenizes filenames, generates unique fileIDs, sorts by access time.

Processed data should be copied to the directory where actor runs.
It is an hdf5 file with one dataframe:
It is a parque file (.pa) with one dataframe:
* index - access time (sorted)
* six tokens derived from the filename ('1', '2', '3', '4', '5', '6')
* filesize ('kB')
* unique file identifier ('fID')


## Rewards
* always negative and correspond to cost to get the file - if it was cached it will be smaller
* files are cached irrespectively from what action actor performed for the file
* cleanup
* discrete environment - removes ones with


# Technical implementation in XCache server
* There are additional containers in the pod.
* environment container
* recieves gstream pfc, and disk info
* recalculates new state, reward, tokenizes recieved gstream info.
* memorizes last state, actors actions for each file
* triggers cleanup at lower HWM then xcache itself. Loops through memorized paths and removes ones least probably needed.
* redis db - used by environment container to store actor responses
* actor container

1 change: 1 addition & 0 deletions gym_cache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
nondeterministic=True,
)

# continuous action cache
register(
id='Cache-continuous-v0',
entry_point='gym_cache.envs:CacheContinousEnv',
Expand Down
73 changes: 50 additions & 23 deletions gym_cache/envs/cache_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class CacheEnv(gym.Env):

def __init__(self, InputData, CacheSize):

# self.name = '100TB_LRU'
self.name = '100TB_DDQN'
self.name = '100TB_LRU'
# self.name = '100TB_DDQN'
# self.name = 'InfiniteCache_DDQN'

self.accesses_filename = InputData + '.pa'
Expand All @@ -35,14 +35,18 @@ def __init__(self, InputData, CacheSize):
self.cache_hwm = .95 * self.cache_size
self.cache_lwm = .90 * self.cache_size
self.cache_kbytes = 0
# tuples of fid: [access_no, filesize, decission]
self.cache_content = {}
self.files_processed = 0
self.data_processed = 0

self.monitoring = []

# from previous cycle
self.weight = 0 # delivered in next cycle.
self.found_in_cache = False # from previous cycle
self.found_in_cache = False
self.fID = None
#########

self.viewer = None

Expand All @@ -51,28 +55,36 @@ def __init__(self, InputData, CacheSize):
self.observation_space = spaces.Box(
# first 6 are tokens, 7th is filesize, 8th is how full is cache at the moment
low=np.array([0, 0, 0, 0, 0, 0, 0, 0]),
high=np.array([maxes[0], maxes[1], maxes[2], maxes[3], maxes[4], maxes[5], maxes[6], 100]),
high=np.array([maxes[0], maxes[1], maxes[2], maxes[3],
maxes[4], maxes[5], maxes[6], 100]),
dtype=np.int32
)
print('environment loaded! cache size [kB]:', self.cache_size)

def load_access_data(self):
# last variable is the fileID.

self.accesses= pd.read_parquet(self.accesses_filename)
print("accesses loaded:", self.accesses.shape[0])
self.accesses = pd.read_parquet(self.accesses_filename)
# self.accesses = self.accesses.head(50000)
self.total_accesses = self.accesses.shape[0]
print("accesses loaded:", self.total_accesses)

def save_monitoring_data(self):
mdata = pd.DataFrame(self.monitoring, columns=['kB', 'cache size', 'cache hit', 'reward'])
mdata = pd.DataFrame(self.monitoring, columns=[
'kB', 'cache size', 'cache hit', 'reward'])
mdata.to_parquet('results/' + self.name + '.pa', engine='pyarrow')

def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]

def _cache_cleanup(self):
# order files by access instance
acc = pd.DataFrame.from_dict(self.cache_content, orient='index', columns=['accNo', 'fs']).sort_values(by='accNo', axis=0)
# create dataframe from cache info
acc = pd.DataFrame.from_dict(self.cache_content, orient='index', columns=[
'accNo', 'fs', 'action'])
# move accesses that we want kept to much later times
acc['accNo'] = acc['accNo'] + acc['action']*1000000
# order files by access instance (equivalent of time)
acc.sort_values(by='accNo', axis=0, inplace=True)
# starting from lowest number remove from cache
counter = 0
while self.cache_kbytes > self.cache_lwm:
Expand All @@ -84,24 +96,35 @@ def _cache_cleanup(self):

def step(self, action):

# calculate reward from old weight, was it in cache and action
# calculate reward, this is for the access delivered in previous state
# takes into account:
# * weight (was remembered from previous step),
# * was it in cache (was remembered from previous step),
# * what action actor took
reward = self.weight
if (self.found_in_cache and action == 0) or (not self.found_in_cache and action == 1):
reward = -reward

# """ checks if cache hit HWM """
# print('cache filled:', self.cache_kbytes)
if self.cache_kbytes > self.cache_hwm:
# print('cache cleanup starting on access:', self.files_processed)
self._cache_cleanup()
if self.fID: # check that this is not the first access
# remember what is the action
prevCacheContent = self.cache_content[self.fID]
self.cache_content[self.fID] = (
prevCacheContent[0], prevCacheContent[1], action)

# checks if cache hit HWM
# print('cache filled:', self.cache_kbytes)
if self.cache_kbytes > self.cache_hwm:
# print('cache cleanup starting on access:', self.files_processed)
self._cache_cleanup()

# takes next access
row = self.accesses.iloc[self.files_processed, :]
fID = row['fID']
self.fID = row['fID']
fs = row['kB']
# print(row['1'], row['2'], row['3'], row['4'], row['5'], row['6'], row['kB'], row['fID'])

self.found_in_cache = fID in self.cache_content
# print('found in cache', self.found_in_cache, fID, self.cache_content)
self.found_in_cache = self.fID in self.cache_content
# print('found in cache', self.found_in_cache, self.fID, self.cache_content)
if self.found_in_cache:
# print('cache hit - 5%')
self.weight = fs * self.cache_value_weight
Expand All @@ -110,23 +133,28 @@ def step(self, action):
self.weight = fs
self.cache_kbytes += fs

self.cache_content[fID] = (self.files_processed, fs)
self.cache_content[self.fID] = (self.files_processed, fs, )

self.monitoring.append([fs, self.cache_kbytes, self.found_in_cache, int(reward)])
self.monitoring.append(
[fs, self.cache_kbytes, self.found_in_cache, int(reward)])

self.files_processed += 1
self.data_processed += fs

state = [row['1'], row['2'], row['3'], row['4'], row['5'], row['6'],
fs, self.cache_kbytes * 100 // self.cache_size]

return np.array(state), int(reward), False, {}
if self.files_processed == self.total_accesses:
self.save_monitoring_data()
self.done = True
return np.array(state), int(reward), self.done, {}

def reset(self):
self.files_processed = 0
self.cache_content = {}
self.cache_kbytes = 0
self.monitoring = []
self.done = False

return self.step(0)[0]

Expand All @@ -145,7 +173,6 @@ def render(self, mode='human'):
return

def close(self):
self.save_monitoring_data()
if self.viewer:
self.viewer.close()
self.viewer = None
Binary file removed results/plots/100TB_LRU.png
Binary file not shown.
Binary file removed results/plots/20TB_LRU.png
Binary file not shown.
Binary file removed results/plots/InfiniteCache_LRU.png
Binary file not shown.
Binary file removed results/plots/combinations/combination.png
Binary file not shown.

0 comments on commit 05806f5

Please sign in to comment.