-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdijkstra_algorithm.py
145 lines (125 loc) · 7.2 KB
/
dijkstra_algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
''' Implementation of the Dijkstra algorithm. Code skeleton taken from www.datacamp.com which
I extended with the making of the graph (in __init__ funciton) and tracking of reactions.'''
#%%
import numpy as np
from heapq import heapify, heappop, heappush
class Graph:
def __init__(self, dat, n_layer):
# prepare variable used to construct matrix
self.spec_list = dat['variable']['species']
self.rea_dict = self.create_reaction_dict(dat)
# create the dictionary to hold the graph in which keys are the nodes (species)
# and items are edges (reactions) as dictionaries, such that if A is connected to B and C with
# weigths of 1 and 2 through reactions b and c and total reaction rate for all reaction contributing to this edge b_t and c_t,
# respectively then it is 'A': {'B':[b,b_t,1], 'C':[c,c_t,2]}
graph = {key: {} for key in self.spec_list}
g_reactions = {key: {} for key in self.spec_list}
# given the multiplicity of the network, we need to track which reactions give the weights
# and what percantage that reaction contribute to that edge by choosing the most prominent reaction (hence the need for the total reaction rate)
# iterate over reactions and fill dictionary value
for i,rea in self.rea_dict.items():
# first calculate reaction rate
rate = dat['variable']['k'][i][n_layer]
for reactant in rea[0]: # first get the reaction rate
if reactant != 'M':
rate *= dat['variable']['y'][n_layer, self.spec_list.index(reactant)]
if rate == 0: # in case there is a zero value change it so no calculation errors when later taking 1/rate
rate = 1e-99
# then fill up the dictionary
if rea[0] == rea[1]:
continue
for reactant in rea[0]:
if reactant != 'M':
for product in rea[1]:
if product != 'M' and product in graph[reactant]: # if there is already a value for this edge, could override
chosen_reaction = np.argmax([g_reactions[reactant][product][1], rate]) # decide whether rate of new reaction is greater than previously found
g_reactions[reactant][product][0] = [g_reactions[reactant][product][0], i][chosen_reaction] # use chosen reaction to be saved
g_reactions[reactant][product][1] += rate # adding to the total reaction rate, how to avoid duplication???
graph[reactant][product] = [graph[reactant][product], 1/rate][chosen_reaction]
elif product != 'M' and product not in graph[reactant]: # if first entry for this edge, simply assign:
graph[reactant][product] = 0
g_reactions[reactant][product] = [0, 0]
g_reactions[reactant][product][0] = i # reaction ID
g_reactions[reactant][product][1] = rate # total reaction rate
graph[reactant][product] = 1/rate # weight
self.graph = graph
self.g_reactions = g_reactions
def get_species(self, eq_side):
''' Returns the species in a given reaction in an array.'''
side_split = eq_side.split('+')
if len(side_split) == 1: # stripping them from white spaces
side_split = np.array([side_split[0].strip()]) # with array length 1 the other method fails so doing it separately
else:
side_split = np.array([r.strip() for r in side_split])
return side_split
def create_reaction_dict(self, dat):
re_dict_sim = {}
for re in dat['variable']['k'].keys():
if re % 2 == 0:
reagents_products = dat['variable']['Rf'][re-1].split('->') # separating production and destruction
reagents = list(self.get_species(reagents_products[1])) # reversed because reaction is reversed
products = list(self.get_species(reagents_products[0]))
else:
reagents_products = dat['variable']['Rf'][re].split('->') # separating production and destruction
reagents = list(self.get_species(reagents_products[0]))
products = list(self.get_species(reagents_products[1]))
re_dict_sim[re] = [reagents, products]
return re_dict_sim
def shortest_distances(self, source: str):
# Initialize the values of all nodes with infinity
distances = {node: float("inf") for node in self.graph}
distances[source] = 0 # Set the source value to 0
# Initialize a priority queue
pq = [(0, source)]
heapify(pq)
# Create a set to hold visited nodes
visited = set()
while pq: # While the priority queue isn't empty
current_distance, current_node = heappop(
pq
) # Get the node with the min distance
if current_node in visited:
continue # Skip already visited nodes
visited.add(current_node) # Else, add the node to visited set
for neighbour, weight in self.graph[current_node].items():
# Calculate the distance from current_node to the neighbor
tentative_distance = current_distance + weight
if tentative_distance < distances[neighbour]:
distances[neighbour] = tentative_distance
heappush(pq, (tentative_distance, neighbour))
predecessors = {node: None for node in self.graph}
for node, distance in distances.items():
for neighbour, weight in self.graph[node].items():
if distances[neighbour] == distance + weight:
predecessors[neighbour] = node
return distances, predecessors
def shortest_path(self, source: str, target: str):
# Generate the predecessors dict
_, predecessors = self.shortest_distances(source)
path = []
reactions = {}
current_node = target
# Backtrack from the target node using predecessors
while current_node:
path.append(current_node)
current_node = predecessors[current_node]
# Reverse the path and reactions and their contributions and rates (as dict -> id: [reaction, contribution, rate])
path.reverse()
for i,node in enumerate(path):
if i == len(path) - 1:
break
neighbour = path[i+1]
# reaction is zeroth element in the graph, contribution is rate/total_rate but I have 1/rate (2nd item) and total rate (first item)
# so it becomes 1/(total_rate * 1/rate)
reactions[self.g_reactions[node][neighbour][0]] = [self.rea_dict[self.g_reactions[node][neighbour][0]], 1/(self.g_reactions[node][neighbour][1] * self.graph[node][neighbour]), 1/self.graph[node][neighbour]]
return path, reactions
# %%
import pickle
vul_data = '/scratch/s2555875/output/archean.vul'
with open(vul_data, 'rb') as handle:
data = pickle.load(handle)
# %%
G = Graph(data, 16)
#%%
G.shortest_path('CH4', 'HCN')
# %%