-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdaemon.py
90 lines (78 loc) · 3.16 KB
/
daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from config import DAEMON_PATH
import os
from watchdog.observers import Observer
import watchdog.events
from candigv2_logging.logging import initialize, CanDIGLogger
import json
from katsu_ingest import ingest_schemas
from htsget_ingest import htsget_ingest
KATSU_URL = os.environ.get("KATSU_URL")
logger = CanDIGLogger(__file__)
initialize()
def ingest_file(file_path):
json_data = None
results = {}
results_path = os.path.join(DAEMON_PATH, "results", os.path.basename(file_path))
try:
with open(file_path) as f:
json_data = json.load(f)
if json_data is not None:
logger.info(f"Ingesting {file_path}")
if "katsu" in json_data:
json_data = json_data["katsu"]
programs = list(json_data.keys())
for program_id in programs:
try:
ingest_results, status_code = ingest_schemas(json_data[program_id]["schemas"])
results[program_id] = ingest_results
except Exception as e:
results[program_id] = f"Exception: {type(e)} {str(e)}"
elif "htsget" in json_data:
do_not_index = False
if "do_not_index" in json_data:
do_not_index = json_data["do_not_index"]
json_data = json_data["htsget"]
programs = list(json_data.keys())
for program_id in programs:
try:
ingest_results, status_code = htsget_ingest(json_data[program_id], do_not_index)
results[program_id] = ingest_results
except Exception as e:
results[program_id] = f"Exception: {type(e)} {str(e)}"
os.remove(file_path)
except Exception as e:
message = f"Couldn't load data from {file_path}: {type(e)} {str(e)}"
logger.error(message)
results["error"] = message
status_code = 500
with open(results_path, "w") as f:
json.dump(results, f)
return results, status_code
class DaemonHandler(watchdog.events.FileSystemEventHandler):
def on_created(self, event):
ingest_file(event.src_path)
if __name__ == "__main__":
## look for any backlog IDs, ingest those, then listen for new IDs to ingest.
ingest_path = os.path.join(DAEMON_PATH, "to_ingest")
logger.info(f"ingesting started on {ingest_path}")
to_ingest = os.listdir(ingest_path)
logger.info(f"Finishing backlog: ingesting {to_ingest}")
while len(to_ingest) > 0:
try:
file_path = f"{ingest_path}/{to_ingest.pop()}"
ingest_file(file_path)
except Exception as e:
logger.warning(str(e))
to_ingest = os.listdir(ingest_path)
# now that the backlog is complete, listen for new files created:
logger.info(f"listening for new files at {ingest_path}")
event_handler = DaemonHandler()
observer = Observer()
observer.schedule(event_handler, ingest_path, recursive=False)
observer.start()
try:
while observer.is_alive():
observer.join(1)
finally:
observer.stop()
observer.join()