From 8e363f668b9a9e6ba164665b771a90c4ecf650f2 Mon Sep 17 00:00:00 2001 From: Antoine Pouille Date: Tue, 26 Mar 2024 10:59:49 +0100 Subject: [PATCH] autoformat kappy using autopep8 --- kappy/kappa_common.py | 8 +- kappy/kappa_graph.py | 386 +++++++++++++++++++++++------------------- kappy/kappa_rest.py | 15 +- 3 files changed, 227 insertions(+), 182 deletions(-) diff --git a/kappy/kappa_common.py b/kappy/kappa_common.py index 3563a0c9e..ce0886911 100644 --- a/kappy/kappa_common.py +++ b/kappy/kappa_common.py @@ -148,8 +148,8 @@ class SimulationParameter(object): """ - def __init__(self, plot_period : float, pause_condition : str, - seed : int = None, store_trace : bool =False): + def __init__(self, plot_period: float, pause_condition: str, + seed: int = None, store_trace: bool = False): self.plot_period = plot_period self.pause_condition = pause_condition self.seed = seed @@ -199,6 +199,7 @@ def toJSON(self): class KappaError(Exception): """ Error returned from the Kappa server""" + def __init__(self, errors): Exception.__init__(self, errors) self.errors = errors @@ -206,6 +207,7 @@ def __init__(self, errors): class KappaApi(ABC): """General api for a kappa interface.""" + def __init__(self): self.__default_param = None return @@ -388,7 +390,7 @@ def simulation_plot(self, limit=None): """ @abc.abstractmethod - def simulation_snapshot(self, snapshot_id: str) -> KappaSnapshot : + def simulation_snapshot(self, snapshot_id: str) -> KappaSnapshot: """Returns a given generated snapshot """ diff --git a/kappy/kappa_graph.py b/kappy/kappa_graph.py index da47b2561..08cbbc6d9 100644 --- a/kappy/kappa_graph.py +++ b/kappy/kappa_graph.py @@ -2,10 +2,12 @@ import re -ident_re = r'[_~][a-zA-Z0-9_~+-]+|[a-zA-Z][a-zA-Z0-9_~+-]*' +ident_re = r'[_~][a-zA-Z0-9_~+-]+|[a-zA-Z][a-zA-Z0-9_~+-]*' line_comment_re = r'//[^\n]*\n' non_nested_block_comment_re = r'/\*(?:[^*]*|\*+[^/])*\*/' -whitespace_re = r'(?:' + line_comment_re + '|' + non_nested_block_comment_re + '|\s)+' +whitespace_re = r'(?:' + line_comment_re + '|' + \ + non_nested_block_comment_re + '|\s)+' + def smallest_non_empty(dic): out = None @@ -14,20 +16,22 @@ def smallest_non_empty(dic): l = len(va) if l > 0: if out is None: - out = (id,va) + out = (id, va) else: if len(out[1]) > l: - out = (id,va) + out = (id, va) return out + class KappaSyntaxError(ValueError): pass + class KappaSite: """class for representing one site of a kappa agent (in a complex)""" - def __init__(self,*,links=None,internals=None, - future_link=None,future_internal=None): + def __init__(self, *, links=None, internals=None, + future_link=None, future_internal=None): self._links = links self._internals = internals self._future_link = future_link @@ -55,14 +59,14 @@ def __repr__(self): else f"future_internal={self._future_internal!r}" ) - def is_more_specific_than(self,ref,*,mapping=None,todos=None): + def is_more_specific_than(self, ref, *, mapping=None, todos=None): if ref._internals is None or \ all(x in self._internals for x in ref._internals): if ref._links is None: return True elif ref._links is True: return not (self._links is None or self._links == []) - elif type(ref._links) is list : + elif type(ref._links) is list: if type(self._links) is list: if len(ref._links) == 0: return len(self._links) == 0 @@ -71,26 +75,29 @@ def is_more_specific_than(self,ref,*,mapping=None,todos=None): "Sigma graph compare not implemented" if len(self._links) == 1: assert mapping is not None, "Missing mapping" - (r_ag,r_si) = ref._links[0] - (ag,si) = self._links[0] - ag_dst=mapping.get(r_ag) + (r_ag, r_si) = ref._links[0] + (ag, si) = self._links[0] + ag_dst = mapping.get(r_ag) if not ag_dst: - ag_dst=ag + ag_dst = ag mapping[r_ag] = ag - todos.append((r_ag,ag)) + todos.append((r_ag, ag)) return si == r_si and ag == ag_dst - else: return False - else: return False + else: + return False + else: + return False else: site = ref._links["site_name"] ag = ref._links["agent_type"] - assert False,"Sorry, I can't deal with site_types." - else: return False + assert False, "Sorry, I can't deal with site_types." + else: + return False - def is_equal(self,ref,*,mapping=None,todos=None): + def is_equal(self, ref, *, mapping=None, todos=None): if (isinstance(ref._internals, type(self._internals)) - and ref._internals == self._internals): - if type(ref._links) is list : + and ref._internals == self._internals): + if type(ref._links) is list: if type(self._links) is list: if len(ref._links) == 0: return len(self._links) == 0 @@ -99,20 +106,23 @@ def is_equal(self,ref,*,mapping=None,todos=None): "Sigma graph compare not implemented" if len(self._links) == 1: assert mapping is not None, "Missing mapping" - (r_ag,r_si) = ref._links[0] - (ag,si) = self._links[0] - ag_dst=mapping.get(r_ag) + (r_ag, r_si) = ref._links[0] + (ag, si) = self._links[0] + ag_dst = mapping.get(r_ag) if not ag_dst: - ag_dst=ag + ag_dst = ag mapping[r_ag] = ag - todos.append((r_ag,ag)) + todos.append((r_ag, ag)) return si == r_si and ag == ag_dst - else: return False - else: return False + else: + return False + else: + return False else: - return (isinstance(ref._links,type(self._links)) + return (isinstance(ref._links, type(self._links)) and ref._links == self._links) - else: return False + else: + return False @property def internal_states(self): @@ -136,21 +146,21 @@ def has_link(self) -> bool: """:returns: whether the link state is neither free nor unspecified?""" return bool(self._links) - def neighbours_in_complex(self,complx): + def neighbours_in_complex(self, complx): """:returns: the list of ``KappaAgent`` connected to here in ``complx`` """ if type(self._links) is list: - return [ complx[a] for (a,s) in self._links ] + return [complx[a] for (a, s) in self._links] else: return [] @staticmethod def __str_link_in_complex(line, row, site, trailing, dst): - out = trailing.pop((((line,row),site),dst),None) + out = trailing.pop((((line, row), site), dst), None) if out is None: free = trailing[None] - trailing[(dst,((line,row),site))] = free + trailing[(dst, ((line, row), site))] = free trailing[None] = free+1 return str(free) else: @@ -158,33 +168,44 @@ def __str_link_in_complex(line, row, site, trailing, dst): def __str_internals(self): if self._internals is None: - if self._future_internal is None: out = "" - else: out = "{#/"+str(self._future_internal)+"}" + if self._future_internal is None: + out = "" + else: + out = "{#/"+str(self._future_internal)+"}" return out elif len(self._internals) == 0: - assert(self._future_internal is None) + assert (self._future_internal is None) return "" else: - head = "{"+ ", ".join(self._internals) - if self._future_internal is None: tail = "}" - else: tail = "/"+str(self._future_internal)+"}" + head = "{" + ", ".join(self._internals) + if self._future_internal is None: + tail = "}" + else: + tail = "/"+str(self._future_internal)+"}" return head+tail def _str_in_complex(self, line, row, site, trailing): - if self._future_link is None: mod = "]" - elif self._future_link is False: mod = "/.]" - else: mod = "/"+str(self.future_link)+"]" + if self._future_link is None: + mod = "]" + elif self._future_link is False: + mod = "/.]" + else: + mod = "/"+str(self.future_link)+"]" if self._links is None: - if self._future_link is None: return self.__str_internals() - else: return "[#"+mod+self.__str_internals() - elif self._links is True: return "[_"+mod+self.__str_internals() + if self._future_link is None: + return self.__str_internals() + else: + return "[#"+mod+self.__str_internals() + elif self._links is True: + return "[_"+mod+self.__str_internals() elif type(self._links) is list: - if len(self._links) == 0: return "[."+mod+self.__str_internals() + if len(self._links) == 0: + return "[."+mod+self.__str_internals() else: links = ", ".join( - [ self.__str_link_in_complex(line, row, site, trailing, x) - for x in self._links ] ) + [self.__str_link_in_complex(line, row, site, trailing, x) + for x in self._links]) return "["+links+mod+self.__str_internals() else: site = self._links["site_name"] @@ -192,37 +213,40 @@ def _str_in_complex(self, line, row, site, trailing): return "["+site+"."+ag+links+mod+self.__str_internals() @staticmethod - def __get_site_name(complx,x): - if type(x[0]) is list: ag = complx[x[0][0]][x[0][1]] - else: ag = complx[x[0]] + def __get_site_name(complx, x): + if type(x[0]) is list: + ag = complx[x[0][0]][x[0][1]] + else: + ag = complx[x[0]] return ag["node_sites"][x[1]]["site_name"] @classmethod - def from_JSONDecoder_in_complex(cls,data,complx,*,in_1d): + def from_JSONDecoder_in_complex(cls, data, complx, *, in_1d): if data[0] != "port": raise Exception("Can only handle port sites for now") raw_links = data[1]["port_links"] - if type(raw_links) is not list: links = raw_links + if type(raw_links) is not list: + links = raw_links else: if in_1d is None: if len(raw_links) > 0 and type(raw_links[0][0]) is list: - links = [ (tuple(x[0]),cls.__get_site_name(complx,x)) - for x in raw_links ] + links = [(tuple(x[0]), cls.__get_site_name(complx, x)) + for x in raw_links] else: - links = [ ((0,x[0]),cls.__get_site_name(complx,x)) - for x in raw_links ] + links = [((0, x[0]), cls.__get_site_name(complx, x)) + for x in raw_links] else: - links = [ ((0,x[0]),cls.__get_site_name(complx,x)) if in_1d - else (tuple(x[0]),cls.__get_site_name(complx,x)) - for x in raw_links ] - return cls(links=links,internals=data[1]["port_states"]) + links = [((0, x[0]), cls.__get_site_name(complx, x)) if in_1d + else (tuple(x[0]), cls.__get_site_name(complx, x)) + for x in raw_links] + return cls(links=links, internals=data[1]["port_states"]) @classmethod def from_string_in_complex(cls, expression: str, position, dangling, completed): # define patterns that make up a site int_state_pat = \ - r'(?: ?{ ?(' +ident_re+ r'|#) ?(?: ?(/ ?)(' +ident_re+ r') ?)?} ?)?' + r'(?: ?{ ?(' + ident_re + r'|#) ?(?: ?(/ ?)(' + ident_re + r') ?)?} ?)?' bnd_state_pat = r' ?\[ ?(.|_|#|\d+) ?(?:(/ ?)(.|\d+) ?)?\] ?' port_pat = r'^' + int_state_pat + bnd_state_pat + int_state_pat + r'$' # parse assuming full site declaration, with bond state declared @@ -242,24 +266,25 @@ def from_string_in_complex(cls, expression: str, links = True else: link = int(g.group(4)) - dst = dangling.pop(link,None) + dst = dangling.pop(link, None) if dst is None: links = link dangling[link] = position else: - links = [ dst ] + links = [dst] completed[dst] = position - #if g.group(5): # if there's an operation + # if g.group(5): # if there's an operation # self._future_link = g.group(6) # figure out what type of internal state operation is being performed internals = None if g.group(1) and not g.group(1) == '#': - internals = [ g.group(1) ] - #self._future_internal = g.group(3) if g.group(3) else '' + internals = [g.group(1)] + # self._future_internal = g.group(3) if g.group(3) else '' elif g.group(7) and not g.group(7) == '#': - internals = [ g.group(7) ] - #self._future_int_state = g.group(9) if g.group(9) else '' - return cls(links=links,internals=internals) + internals = [g.group(7)] + # self._future_int_state = g.group(9) if g.group(9) else '' + return cls(links=links, internals=internals) + class KappaAgent(abc.Sequence): """class for representing one kappa agent inside a complex @@ -273,7 +298,7 @@ class KappaAgent(abc.Sequence): """ - def __init__(self, typ : str, node_id, sites : dict): + def __init__(self, typ: str, node_id, sites: dict): self._type = typ self._node_id = node_id self._sites = sites @@ -288,27 +313,29 @@ def __repr__(self): def __len__(self): return len(self._sites) - def __getitem__(self,key : str): + def __getitem__(self, key: str): return self._sites[key] def __iter__(self): return iter(self._sites.items()) - def is_more_specific_than(self,ref,*,mapping=None,todos=None): - if ref._type==self._type: - return all (self._sites.get(na,KappaSite()).\ - is_more_specific_than(ref._sites[na], - mapping=mapping, - todos=todos) - for na in ref._sites) - else: return False - - def is_equal(self,ref,*,mapping=None,todos=None): - if ref._type==self._type: - return all (self._sites.get(na,KappaSite()).\ - is_equal(ref._sites[na], mapping=mapping, todos=todos) - for na in ref._sites) - else: return False + def is_more_specific_than(self, ref, *, mapping=None, todos=None): + if ref._type == self._type: + return all(self._sites.get(na, KappaSite()). + is_more_specific_than(ref._sites[na], + mapping=mapping, + todos=todos) + for na in ref._sites) + else: + return False + + def is_equal(self, ref, *, mapping=None, todos=None): + if ref._type == self._type: + return all(self._sites.get(na, KappaSite()). + is_equal(ref._sites[na], mapping=mapping, todos=todos) + for na in ref._sites) + else: + return False def get_type(self) -> str: """::returns: the type of the agent""" @@ -318,7 +345,7 @@ def get_node_id(self): """::returns: the id of the agent if present (else Null)""" return self._node_id - def get_neighbours_in_complex(self,complx): + def get_neighbours_in_complex(self, complx): """Destination of the edges. :param KappaComplex complx: Complex ``self`` is part of @@ -329,12 +356,12 @@ def get_neighbours_in_complex(self,complx): potential duplicates and self listing. """ - return [ el for (_,s) in self - for el in s.neighbours_in_complex(complx) ] + return [el for (_, s) in self + for el in s.neighbours_in_complex(complx)] def _str_in_complex(self, line, row, trailing): - sites = [ n + self._sites[n]._str_in_complex(line, row, n, trailing) - for n in self._sites ] + sites = [n + self._sites[n]._str_in_complex(line, row, n, trailing) + for n in self._sites] if self._node_id is not None: witness = "x"+str(self._node_id)+":" else: @@ -342,15 +369,16 @@ def _str_in_complex(self, line, row, trailing): return witness + self._type + "(" + " ".join(sites) + ")" @classmethod - def from_JSONDecoder_in_complex(cls,data,complx,*,in_1d): - if data is None: return None + def from_JSONDecoder_in_complex(cls, data, complx, *, in_1d): + if data is None: + return None else: sites = dict([ x["site_name"], KappaSite.from_JSONDecoder_in_complex(x["site_type"], complx, in_1d=in_1d) ] for x in data["node_sites"]) - return cls(data["node_type"],data.get("node_id"),sites) + return cls(data["node_type"], data.get("node_id"), sites) @classmethod def from_string_in_complex(cls, expression: str, @@ -363,7 +391,8 @@ def from_string_in_complex(cls, expression: str, agent_sign_pat = r'\(([^()]*)\)' agent_oper_pat = r'(\+|-)?' agent_pat = \ - r'^(' +agent_name_pat+ r')' +agent_sign_pat+agent_oper_pat+ r'$' + r'^(' + agent_name_pat + r')' + \ + agent_sign_pat+agent_oper_pat + r'$' matches = re.match(agent_pat, expression.strip()) if not matches: matches = re.match(agent_pat, expression.strip() + '()') @@ -378,19 +407,20 @@ def from_string_in_complex(cls, expression: str, if ag_signature == '': site_list = [] else: - site_block_re = r'('+ident_re+')((?:\[[^\]]+\]|{[^}]+})+) ?,? ?' - site_list = re.finditer(site_block_re,ag_signature) + site_block_re = r'('+ident_re + \ + ')((?:\[[^\]]+\]|{[^}]+})+) ?,? ?' + site_list = re.finditer(site_block_re, ag_signature) agent_signature = {} for id, match in enumerate(site_list): - name=match.group(1) + name = match.group(1) site = KappaSite.from_string_in_complex(match.group(2), - (position,name), + (position, name), dangling, completed) - agent_signature[name]=site + agent_signature[name] = site # process abundance operator, if present - #abundance_change = matches.group(3) if matches.group(3) else '' - return cls(agent_name,None,agent_signature) + # abundance_change = matches.group(3) if matches.group(3) else '' + return cls(agent_name, None, agent_signature) class KappaComplexIterator(abc.Iterator): @@ -411,13 +441,15 @@ def __next__(self): while o is None: o = next(self._row_iter) self._row += 1 - return ((self._line,self._row),o) if self._with_key else o + return ((self._line, self._row), o) if self._with_key else o except StopIteration: self._line += 1 self._row_iter = None self.__next__() -allocated_once_for_all_empty_list=[] + +allocated_once_for_all_empty_list = [] + class KappaComplex(abc.Sequence): """Class for representing a Kappa connected component @@ -442,12 +474,13 @@ class KappaComplex(abc.Sequence): def __init__(self, agents): self._agents = agents self._nodes_by_type = {} - i=0 + i = 0 for line in agents: - j=0 + j = 0 for el in line: if el: - self._nodes_by_type.setdefault(el.get_type(),[]).append((i,j)) + self._nodes_by_type.setdefault( + el.get_type(), []).append((i, j)) j += 1 i += 1 @@ -455,27 +488,29 @@ def __repr__(self): return "KappaComplex({})".format(repr(self._agents)) def __str__(self): - trailing = { None: 0 } - lines = [ [ - "." if e is None else e._str_in_complex(l,r,trailing) - for (r,e) in enumerate(line) - ] for (l,line) in enumerate(self._agents) ] - return "\\ ".join(map(", ".join,lines)) - - def __getitem__(self,key): + trailing = {None: 0} + lines = [[ + "." if e is None else e._str_in_complex(l, r, trailing) + for (r, e) in enumerate(line) + ] for (l, line) in enumerate(self._agents)] + return "\\ ".join(map(", ".join, lines)) + + def __getitem__(self, key): if type(key) is tuple: - (l,r) = key + (l, r) = key return self._agents[l][r] - else: return self._agents[0][key] + else: + return self._agents[0][key] def __iter__(self): - return KappaComplexIterator(self._agents,with_key=False) + return KappaComplexIterator(self._agents, with_key=False) def __len__(self): r = 0 for line in self._agents: for el in line: - if el: r +=1 + if el: + r += 1 return r def items(self): @@ -483,9 +518,9 @@ def items(self): :returns: an Iterator where the items are turples (coordinates,agent) """ - return KappaComplexIterator(self._agents,with_key=True) + return KappaComplexIterator(self._agents, with_key=True) - def agent_ids_of_type(self,type : str): + def agent_ids_of_type(self, type: str): """:returns: the list of coordinates of agents of type ``type``""" return self._nodes_by_type[type] @@ -496,73 +531,76 @@ def agent_ids_by_type(self): """ return self._nodes_by_type - def contains_at_pos(self,id,ref,ref_id): - mapping={ ref_id : id } - todos=[(ref_id,id)] + def contains_at_pos(self, id, ref, ref_id): + mapping = {ref_id: id} + todos = [(ref_id, id)] while len(todos) > 0: - (rag,ag)=todos.pop() + (rag, ag) = todos.pop() if not self[ag].is_more_specific_than(ref[rag], mapping=mapping, todos=todos): return None return mapping - def __eq_rooted(self,id,ref,ref_id): - mapping={ ref_id : id } - todos=[(ref_id,id)] + def __eq_rooted(self, id, ref, ref_id): + mapping = {ref_id: id} + todos = [(ref_id, id)] while len(todos) > 0: - (rag,ag)=todos.pop() + (rag, ag) = todos.pop() if not self[ag].is_equal(ref[rag], mapping=mapping, todos=todos): return None return mapping - def find_pattern(self,ref): + def find_pattern(self, ref): """:returns: a list of dictionnary \ ``coordinates -> coordinates``. Each dictionnary represents an \ embedding of ``ref`` in the complex. """ - ag_ty,ref_ids = smallest_non_empty(ref._nodes_by_type) - candidates = self._nodes_by_type.get(ag_ty,allocated_once_for_all_empty_list) - return [ x for x in [ self.contains_at_pos(cand,ref,ref_ids[0]) - for cand in candidates ] - if x ] - - def __contains__(self,pattern): + ag_ty, ref_ids = smallest_non_empty(ref._nodes_by_type) + candidates = self._nodes_by_type.get( + ag_ty, allocated_once_for_all_empty_list) + return [x for x in [self.contains_at_pos(cand, ref, ref_ids[0]) + for cand in candidates] + if x] + + def __contains__(self, pattern): return not len(self.find_pattern(pattern)) == 0 - def __same_sum_formula(a,b): - if len(a._nodes_by_type) != len(b._nodes_by_type): return False + def __same_sum_formula(a, b): + if len(a._nodes_by_type) != len(b._nodes_by_type): + return False for ty in a._nodes_by_type: - if (len(b._nodes_by_type.get(ty,allocated_once_for_all_empty_list)) - != len(a._nodes_by_type[ty])): + if (len(b._nodes_by_type.get(ty, allocated_once_for_all_empty_list)) + != len(a._nodes_by_type[ty])): return False return True - def __eq__(a,b): + def __eq__(a, b): if a.__same_sum_formula(b): - ag_ty,ref_ids = smallest_non_empty(a._nodes_by_type) - ref_id=ref_ids[0] + ag_ty, ref_ids = smallest_non_empty(a._nodes_by_type) + ref_id = ref_ids[0] for cand in b._nodes_by_type[ag_ty]: - if b.__eq_rooted(cand,a,ref_id): return True + if b.__eq_rooted(cand, a, ref_id): + return True return False @classmethod - def from_JSONDecoder(cls,data): + def from_JSONDecoder(cls, data): """ Build a KappaComplex from its JSON representation""" if len(data) > 0 and type(data[0]) is dict: - return cls([ [ - KappaAgent.from_JSONDecoder_in_complex(x,data,in_1d=True) + return cls([[ + KappaAgent.from_JSONDecoder_in_complex(x, data, in_1d=True) for x in data - ] ]) + ]]) else: - return cls([ [ - KappaAgent.from_JSONDecoder_in_complex(x,data,in_1d=False) + return cls([[ + KappaAgent.from_JSONDecoder_in_complex(x, data, in_1d=False) for x in line - ] for line in data ]) + ] for line in data]) @classmethod - def from_string(cls,expression): + def from_string(cls, expression): """Build a KappaComplex from its Kappa syntax""" # remove comments, new_lines, multiple spaces expression = re.sub(whitespace_re, ' ', expression) @@ -577,18 +615,19 @@ def from_string(cls,expression): dangling = {} completed = {} for id, item in enumerate(matches): - agent = KappaAgent.from_string_in_complex(item,(0,id), + agent = KappaAgent.from_string_in_complex(item, (0, id), dangling, completed) agent_list.append(agent) if len(dangling) != 0: raise KappaSyntaxError('Dangling link <' + str(dangling.popitem()) + '> in complex <' + expression + '>.') - for (((col,row),si),dst) in completed.items(): + for (((col, row), si), dst) in completed.items(): site = agent_list[row][si] agent_list[row]._sites[si] = KappaSite(links=[dst], internals=site.internal_states) - return cls([ agent_list ]) + return cls([agent_list]) + class KappaSnapshot: """class for representing a kappa snapshot @@ -596,8 +635,8 @@ class KappaSnapshot: The string representation is the corresponding Kappa code. """ - def __init__(self, *, time : float, event : int, - complexes : list = [], tokens : dict = {}): + def __init__(self, *, time: float, event: int, + complexes: list = [], tokens: dict = {}): self._time = time self._event = event self._complexes = complexes @@ -615,10 +654,11 @@ def __str__(self): event = "// Snapshot [Event: {0:d}]\n".format(self._event) time = '%def: "T0" "{0:f}"\n\n'.format(self._time) complexes = "".join( - [ "%init: {0:d} {1}\n".format(n,c) for (n,c) in self._complexes ] + ["%init: {0:d} {1}\n".format(n, c) for (n, c) in self._complexes] ) tokens = "".join( - [ "%init: {0:d} {1}\n".format(self._tokens[t],t) for t in self._tokens ] + ["%init: {0:d} {1}\n".format(self._tokens[t], t) + for t in self._tokens] ) return event+time+complexes+tokens @@ -652,9 +692,9 @@ def get_complexes_by_size(self): """ out = {} - for (abundance,complx) in self._complexes: + for (abundance, complx) in self._complexes: size = len(complx) - out.setdefault(size,[]).append((abundance,complx)) + out.setdefault(size, []).append((abundance, complx)) return out def get_complexes_by_abundance(self): @@ -664,8 +704,8 @@ def get_complexes_by_abundance(self): """ out = {} - for (abundance,complx) in self._complexes: - out.setdefault(abundance,[]).append(complx) + for (abundance, complx) in self._complexes: + out.setdefault(abundance, []).append(complx) return out def get_largest_complexes(self): @@ -697,15 +737,15 @@ def get_size_distribution(self): """ out = {} - for (abundance,complx) in self._complexes: + for (abundance, complx) in self._complexes: size = len(complx) - out[size] = out.get(size,0) + abundance + out[size] = out.get(size, 0) + abundance return out def get_total_mass(self) -> int: """Get the total number of agents""" out = 0 - for (abundance,complx) in self._complexes: + for (abundance, complx) in self._complexes: out += abundance * len(complx) return out @@ -716,16 +756,16 @@ def tokens(self): """ return self._tokens - def get_token_abundance(self,token : str) -> int: + def get_token_abundance(self, token: str) -> int: """Get the abundance of ``token`` """ return self._tokens[token] @classmethod - def from_JSONDecoder(cls,data): - complexes = [ (x[0], KappaComplex.from_JSONDecoder(x[1])) - for x in data["snapshot_agents"] ] - tokens = dict( [ x[1], x[0] ] for x in data["snapshot_tokens"] ) + def from_JSONDecoder(cls, data): + complexes = [(x[0], KappaComplex.from_JSONDecoder(x[1])) + for x in data["snapshot_agents"]] + tokens = dict([x[1], x[0]] for x in data["snapshot_tokens"]) return cls(time=data["snapshot_time"], event=data["snapshot_event"], complexes=complexes, tokens=tokens) diff --git a/kappy/kappa_rest.py b/kappy/kappa_rest.py index f5880ad50..a15923b99 100644 --- a/kappy/kappa_rest.py +++ b/kappy/kappa_rest.py @@ -10,10 +10,11 @@ from requests import exceptions, request from .kappa_common import KappaError, PlotLimit, KappaApi, File, \ - FileMetadata + FileMetadata from .kappa_graph import KappaSnapshot + @KappaApi._fix_docs class KappaRest(KappaApi): """Client to a Kappa tools driver run as a server. @@ -24,6 +25,7 @@ class KappaRest(KappaApi): endpoint -- The url to the kappa server. project_id -- An identifier for this particular project. """ + def __init__(self, endpoint, project_id=None): self.url = "{0}/v2".format(endpoint) if project_id is None: @@ -121,21 +123,22 @@ def _project_delete(self): # Standardized API methods. Docs are provided by parent. def project_parse(self, sharing_level="compatible_patterns", **kwargs): - overwrites = '&'.join('%s=%s' % (key, value) for (key, value) in kwargs.items()) + overwrites = '&'.join('%s=%s' % (key, value) + for (key, value) in kwargs.items()) return self._post(self.in_project('parse', '?'.join([sharing_level, overwrites]))) def project_overwrite(self, ast, file_id="model.ka"): - return self._post(self.in_project('overwrite',file_id),ast) + return self._post(self.in_project('overwrite', file_id), ast) def file_create(self, file_): - return self._put(self.in_project('files', file_.get_id(),'position', str(file_.get_position())), file_.get_content()) + return self._put(self.in_project('files', file_.get_id(), 'position', str(file_.get_position())), file_.get_content()) def file_delete(self, file_id): return self._delete(self.in_project('files', file_id)) def file_get(self, file_id): f = self._get(self.in_project('files', file_id)) - return File(FileMetadata(file_id,f[1]),f[0]) + return File(FileMetadata(file_id, f[1]), f[0]) def file_info(self): info = self._get(self.in_project('files')) @@ -173,7 +176,7 @@ def simulation_snapshots(self): def simulation_snapshot(self, snapshot_id): return KappaSnapshot.from_JSONDecoder( - self._get(self.in_project('simulation', 'snapshots',snapshot_id)) + self._get(self.in_project('simulation', 'snapshots', snapshot_id)) ) def simulation_delete(self):