Skip to content

Commit

Permalink
rewire evaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
joel99 committed Apr 3, 2024
1 parent ae268e9 commit f3db585
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 67 deletions.
9 changes: 6 additions & 3 deletions falcon_challenge/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
from pathlib import Path
from dataclasses import dataclass, field

from hydra.core.config_store import ConfigStore
Expand All @@ -25,7 +26,7 @@ class FalconConfig:
task: FalconTask = FalconTask.h1
# n_channels: int = 176
bin_size_ms: int = 20
dataset_handles: list[str] = field(default_factory=lambda: []) # Compute with evaluator.get_eval_handles
# dataset_handles: list[str] = field(default_factory=lambda: []) # Compute with evaluator.get_eval_handles

@property
def n_channels(self):
Expand All @@ -51,11 +52,13 @@ def out_dim(self):
return 2
raise NotImplementedError(f"Task {self.task} not implemented.")

def hash_dataset(self, handle: str):
def hash_dataset(self, handle: str | Path):
r"""
handle - path.stem of a datafile.
Convenience function to help identify what "session" a datafile belongs to.
Convenience function to help identify what "session" a datafile belongs to.. If multiple files per session in real-world time, this may _not_ uniquely identify runfile.
"""
if isinstance(handle, Path):
handle = handle.stem
if self.task == FalconTask.h1:
handle = handle.replace('-', '_')
# dandi-like atm but not quite determined; e.g. S0_set_1_calib
Expand Down
240 changes: 176 additions & 64 deletions falcon_challenge/evaluator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List
import os
import pickle
from collections import defaultdict
import logging
import numpy as np
from pathlib import Path
Expand All @@ -14,6 +15,22 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

DATASET_HELDINOUT_MAP = {
'h1': {
'held_in': ['S0', 'S1', 'S2', 'S3', 'S4', 'S5'],
'held_out': ['S6', 'S7', 'S8', 'S9', 'S10', 'S11', 'S12'],
},
'm1': {
'held_in': ['20120924', '20120926', '20120927', '20120928'],
'held_out': ['20121004', '20121017', '20121022', '20121024'],
},
'h2': {

},
'm2': {
},
}

HELD_IN_KEYS = {
FalconTask.h1: ['S0_', 'S1_', 'S2_', 'S3_', 'S4_', 'S5_'],
FalconTask.m1: ['L_20120924', 'L_20120926', 'L_20120927', 'L_20120928'],
Expand All @@ -34,58 +51,145 @@
'held_out': "Held Out",
}

# def evaluate(
# test_annotation_file: str, # The annotation file for the phase - but our labels are pulled from eval data.
# user_submission_file: str, # * JY: This appears to always be /submission/submission.csv on EvalAI. No matter - load it as a pickle.
# phase_codename: str, # e.g. minival or test
# **kwargs
# ):
# r"""
# Evaluate payloads with potentially multiple splits worth of data
# - Low pri: can I provide all results or just one split's worth entry? Currently providing 1, examples just provide 1, but in general would be nice to provide all. User shouldn't be able to submit more than 1, though.
# """
# # ! Want: Locally, test_annotation should be somewhere safe (tmp)
# # ! Remotely, it shoudl be /submission/submission.csv exactly.
# # Ignore explicit annotations provided and directly search for concatenated answers
# logger.info(f"Evaluation: Docker side")
# logger.info(f"Loading GT from {test_annotation_file}")
# logger.info(f"Loading submission from {user_submission_file}")
# logger.info(f"Phase: {phase_codename}")

# result = []
# # Load pickles
# with open(test_annotation_file, 'rb') as test_annotation_file, open(user_submission_file, 'rb') as user_submission_file:
# test_annotations = pickle.load(test_annotation_file)
# user_submission = pickle.load(user_submission_file)
# for datasplit in user_submission: # datasplit e.g. h1, m1
# if datasplit not in test_annotations:
# raise ValueError(f"Missing {datasplit} in GT labels.")
# split_annotations = test_annotations[datasplit]
# split_result = {}
# split_result["Normalized Latency"] = user_submission[datasplit]["normalized_latency"]
# for in_or_out in split_annotations.keys():
# if f'{in_or_out}_pred' in user_submission[datasplit]:
# pred = user_submission[datasplit][f'{in_or_out}_pred']
# mask = user_submission[datasplit][f'{in_or_out}_eval_mask']
# # User submission should be in an expected format because we force predictions through our codepack interface... right? They could hypothetically spoof. But we see dockerfile.
# eval_fn = FalconEvaluator.compute_metrics_classification if 'h2' in datasplit else FalconEvaluator.compute_metrics_regression
# metrics_held_in = eval_fn(pred, split_annotations[in_or_out], mask)
# for k in metrics_held_in:
# split_result[f'{HELDIN_OR_OUT_MAP[in_or_out]} {k}'] = metrics_held_in[k]
# result.append({datasplit: split_result})

# print(f"Returning result from phase: {phase_codename}: {result}")
# # Out struct according to https://evalai.readthedocs.io/en/latest/evaluation_scripts.html
# return {"result": result, 'submission_result': result[0]}


def evaluate(
test_annotation_file: str, # The annotation file for the phase - but our labels are pulled from eval data.
test_annotation_file: str, # The annotation file for the phase
user_submission_file: str, # * JY: This appears to always be /submission/submission.csv on EvalAI. No matter - load it as a pickle.
phase_codename: str, # e.g. minival or test
**kwargs
):
r"""
Test struct:
{
'h1': {
'hash': {
'data': tgt,
'mask': mask,
}
}
}
User submission struct:
{
'h1': {
'hash': pred,
'normalized_latency': 1,
}
}
Evaluate payloads with potentially multiple splits worth of data
- Low pri: can I provide all results or just one split's worth entry? Currently providing 1, examples just provide 1, but in general would be nice to provide all. User shouldn't be able to submit more than 1, though.
"""
# ! Want: Locally, test_annotation should be somewhere safe (tmp)
# ! Remotely, it shoudl be /submission/submission.csv exactly.
# Ignore explicit annotations provided and directly search for concatenated answers
test_annotation_file = os.environ.get("GT_PATH", './local_gt.pkl')
logger.info(f"Evaluation: Docker side")
logger.info(f"Loading GT from {test_annotation_file}")
logger.info(f"Loading submission from {user_submission_file}")
logger.info(f"Phase: {phase_codename}")

result = []
# Load pickles
with open(test_annotation_file, 'rb') as test_annotation_file, open(user_submission_file, 'rb') as user_submission_file:
test_annotations = pickle.load(test_annotation_file)
user_submission = pickle.load(user_submission_file)
try:
with open(test_annotation_file, 'rb') as test_annotation_file, open(user_submission_file, 'rb') as user_submission_file:
test_annotations = pickle.load(test_annotation_file)
user_submission = pickle.load(user_submission_file)
except Exception as e:
logger.error(f"Checking root: {os.listdir('/')}")
raise ValueError(f"Failed to load submission pickles: {e}. dir is {os.getcwd()}; contents {os.listdir()}. \nChecking tmp: {os.listdir('/tmp')}\n Checking root {os.listdir('/')}")
for datasplit in user_submission: # datasplit e.g. h1, m1
if datasplit not in test_annotations:
raise ValueError(f"Missing {datasplit} in GT labels.")
split_annotations = test_annotations[datasplit]
split_result = {}
split_result["Normalized Latency"] = user_submission[datasplit]["normalized_latency"]
for in_or_out in split_annotations.keys():
if f'{in_or_out}_pred' in user_submission[datasplit]:
pred = user_submission[datasplit][f'{in_or_out}_pred']
mask = user_submission[datasplit][f'{in_or_out}_eval_mask']
# User submission should be in an expected format because we force predictions through our codepack interface... right? They could hypothetically spoof. But we see dockerfile.
eval_fn = FalconEvaluator.compute_metrics_classification if 'h2' in datasplit else FalconEvaluator.compute_metrics_regression
metrics_held_in = eval_fn(pred, split_annotations[in_or_out], mask)
for k in metrics_held_in:
split_result[f'{HELDIN_OR_OUT_MAP[in_or_out]} {k}'] = metrics_held_in[k]
del user_submission[datasplit]["normalized_latency"]
pred_dict = defaultdict(list)
tgt_dict = defaultdict(list)
mask_dict = defaultdict(list)
for dataset in user_submission[datasplit]:
dataset_pred = user_submission[datasplit][dataset]
dataset_tgt = split_annotations[dataset]['data']
dataset_mask = split_annotations[dataset]['mask']
if dataset in DATASET_HELDINOUT_MAP[datasplit]['held_in']:
pred_dict['held_in'].append(dataset_pred)
tgt_dict['held_in'].append(dataset_tgt)
mask_dict['held_in'].append(dataset_mask)
elif dataset in DATASET_HELDINOUT_MAP[datasplit]['held_out']:
pred_dict['held_out'].append(dataset_pred)
tgt_dict['held_out'].append(dataset_tgt)
mask_dict['held_out'].append(dataset_mask)
else:
raise ValueError(f"Dataset {dataset} submitted but not found in held-in or held-out list of split {datasplit}.")
for in_or_out in pred_dict:
if len(pred_dict[in_or_out]) < len(DATASET_HELDINOUT_MAP[datasplit][in_or_out]):
raise ValueError(f"Missing predictions for {datasplit} {in_or_out}. User submitted: {user_submission[datasplit].keys()}. Expecting more like: {HELDIN_OR_OUT_MAP[datasplit][in_or_out]}.")
pred = np.concatenate(pred_dict[in_or_out])
tgt = np.concatenate(tgt_dict[in_or_out])
mask = np.concatenate(mask_dict[in_or_out])
eval_fn = FalconEvaluator.compute_metrics_classification if 'h2' in datasplit else FalconEvaluator.compute_metrics_regression
try:
metrics = eval_fn(pred, tgt, mask)
except Exception as e:
raise ValueError(f"Failed to compute metrics for {datasplit} {in_or_out}: {e}")
for k in metrics:
split_result[f'{HELDIN_OR_OUT_MAP[in_or_out]} {k}'] = metrics[k]
result.append({datasplit: split_result})

print(f"Returning result from phase: {phase_codename}: {result}")
# Out struct according to https://evalai.readthedocs.io/en/latest/evaluation_scripts.html
return {"result": result, 'submission_result': result[0]}


class FalconEvaluator:

def __init__(self, eval_remote=False, split='h1'):
self.eval_remote = eval_remote
assert split in ['h1', 'h2', 'm1', 'm2'], "Split must be h1, h2, m1, or m2."
self.dataset: FalconTask = getattr(FalconTask, split)
self.cfg = FalconConfig(self.dataset)

@staticmethod
def get_eval_handles(is_remote: bool, dataset: FalconTask, phase: str = 'minival'):
Expand Down Expand Up @@ -113,11 +217,13 @@ def get_eval_files(self, phase: str = 'minival'):
return handles

def predict_files(self, decoder: BCIDecoder, eval_files: List):
all_preds = []
all_targets = []
all_eval_mask = []
# returns triple dict, keyed by datafile hash and contains preds, targets, and eval_mask respective
# TODO this does not return uniquely identifiable data if eval_files is partial, e.g. if we only has set 2 of a day with 2 sets, we'll happily just provide partial predictions.
all_preds = defaultdict(list)
all_targets = defaultdict(list)
all_eval_mask = defaultdict(list)

for datafile in tqdm(eval_files):
for datafile in tqdm(sorted(eval_files)):
if not datafile.exists():
raise FileNotFoundError(f"File {datafile} not found.")
neural_data, decoding_targets, trial_change, eval_mask = load_nwb(datafile, dataset=self.dataset)
Expand All @@ -130,26 +236,33 @@ def predict_files(self, decoder: BCIDecoder, eval_files: List):
trial_preds.append(decoder.predict(neural_observations))
else:
decoder.observe(neural_observations)
trial_preds.append(np.full(FalconConfig(self.dataset).out_dim, np.nan))
all_preds.append(np.stack(trial_preds))
all_targets.append(decoding_targets)
all_eval_mask.append(eval_mask)
all_preds = np.concatenate(all_preds)
all_targets = np.concatenate(all_targets)
all_eval_mask = np.concatenate(all_eval_mask)
trial_preds.append(np.full(self.cfg.out_dim, np.nan))
all_preds[self.cfg.hash_dataset(datafile)].append(np.stack(trial_preds))
all_targets[self.cfg.hash_dataset(datafile)].append(decoding_targets)
all_eval_mask[self.cfg.hash_dataset(datafile)].append(eval_mask)
for k in all_preds:
all_preds[k] = np.concatenate(all_preds[k])
all_targets[k] = np.concatenate(all_targets[k])
all_eval_mask[k] = np.concatenate(all_eval_mask[k])
return all_preds, all_targets, all_eval_mask

def evaluate_files(self, decoder: BCIDecoder, eval_files: List):
all_preds, all_targets, all_eval_mask = self.predict_files(decoder, eval_files)
metrics = self.compute_metrics(all_preds, all_targets, all_eval_mask)
return metrics

def evaluate(self, decoder: BCIDecoder, phase: str):
def evaluate(self, decoder: BCIDecoder, phase: str, held_out_only: bool = False, specific_keys: List = []):
r"""
Note: Locally, this can produce metrics, but locally and remotely it should also write a submission file
that the actual evaluator on remote uses. The evaluation is done separately on remote.
held_out_only: Only run predictions on held out
specific_keys: Overrides held_out_only. Only run predictions on datafiles with specific keys.
"""
assert phase in ['minival', 'test'], "Phase must be minival or test."
if phase == 'minival' and (held_out_only or specific_keys):
logger.warning("Ignoring held_out_only and specific_keys for minival phase.")
held_out_only = False

np.random.seed(0)
# ! TODO ideally seed other libraries as well...? Is that our responsibility?
Expand All @@ -168,45 +281,44 @@ def evaluate(self, decoder: BCIDecoder, phase: str):
eval_files_held_in = [f for f in eval_files if any(k in f.name for k in HELD_IN_KEYS[self.dataset])]
eval_files_held_out = [f for f in eval_files if any(k in f.name for k in HELD_OUT_KEYS[self.dataset])]
assert len(eval_files) == len(eval_files_held_in) + len(eval_files_held_out), f"Mismatch in extracted eval #: Eval file state is not consistent with benchmark creation settings. Found {len(eval_files)} files, {len(eval_files_held_in)} held in, {len(eval_files_held_out)} held out."
all_preds_held_in, all_targets_held_in, all_eval_mask_held_in = self.predict_files(decoder, eval_files_held_in)
all_preds_held_out, all_targets_held_out, all_eval_mask_held_out = self.predict_files(decoder, eval_files_held_out)

# Indirect remote setup to satisfy EvalAI interface. Save metrics / comparison to file.
if USE_PKLS:
pred_payload = {self.dataset.name: {
'held_in_pred': all_preds_held_in,
'held_in_eval_mask': all_eval_mask_held_in,
'held_out_pred': all_preds_held_out,
'held_out_eval_mask': all_eval_mask_held_out,
'normalized_latency': 1, # TODO - CW insert timing code
}}
truth_payload = {self.dataset.name: {
'held_in': all_targets_held_in,
'held_out': all_targets_held_out,
}}
else:
metrics_held_in = self.compute_metrics(all_preds_held_in, all_targets_held_in, all_eval_mask_held_in)
metrics_held_out = self.compute_metrics(all_preds_held_out, all_targets_held_out, all_eval_mask_held_out)
for k, v in metrics_held_in.items():
metrics[f'{HELDIN_OR_OUT_MAP["held_in"]} {k}'] = v
for k, v in metrics_held_out.items():
metrics[f'{HELDIN_OR_OUT_MAP["held_out"]} {k}'] = v

if specific_keys:
raise NotImplementedError("not sure what metrics to compute for specific keys yet.")
elif held_out_only:
eval_files_held_in = []

all_preds, all_targets, all_eval_mask = self.predict_files(decoder, eval_files_held_out)
if eval_files_held_in:
all_preds_held_in, all_targets_held_in, all_eval_mask_held_in = self.predict_files(decoder, eval_files_held_in)
all_preds.update(all_preds_held_in)
all_targets.update(all_targets_held_in)
all_eval_mask.update(all_eval_mask_held_in)

else:
all_preds, all_targets, all_eval_mask = self.predict_files(decoder, eval_files)
if USE_PKLS:
pred_payload = {self.dataset.name: {
'held_in_pred': all_preds,
'held_in_eval_mask': all_eval_mask,
'normalized_latency': 1, # TODO - CW insert timing code
}}
truth_payload = {self.dataset.name: {
'held_in': all_targets,
}}
else:
metrics_minival = self.compute_metrics(all_preds, all_targets, all_eval_mask)
for k, v in metrics_minival.items():
metrics[f'{HELDIN_OR_OUT_MAP["held_in"]} {k}'] = v

# Indirect remote setup to satisfy EvalAI interface. Save metrics / comparison to file.
if USE_PKLS:
inner_pred = {**all_preds}
inner_tgt_spoof = { # spoof for local mirror of eval ai path, in reality targets are already compiled on eval ai side.
k: {
'data': all_targets[k],
'mask': all_eval_mask[k],
} for k in all_targets
}
inner_pred['normalized_latency'] = 1 # TODO - CW insert timing code
pred_payload = {self.dataset.name: inner_pred}
truth_payload = {self.dataset.name: inner_tgt_spoof}
else:
pass
# TODO restore
# metrics_held_in = self.compute_metrics(all_preds_held_in, all_targets_held_in, all_eval_mask_held_in)
# metrics_held_out = self.compute_metrics(all_preds_held_out, all_targets_held_out, all_eval_mask_held_out)
# for k, v in metrics_held_in.items():
# metrics[f'{HELDIN_OR_OUT_MAP["held_in"]} {k}'] = v
# for k, v in metrics_held_out.items():
# metrics[f'{HELDIN_OR_OUT_MAP["held_out"]} {k}'] = v

if USE_PKLS:
with open(prediction_path, 'wb') as f:
pickle.dump(pred_payload, f)
Expand Down
Loading

0 comments on commit f3db585

Please sign in to comment.