From d579be2f54ed376bb68e18961360feaa6540b2e9 Mon Sep 17 00:00:00 2001 From: PPakalns Date: Tue, 22 Jan 2019 02:14:25 +0200 Subject: [PATCH 1/2] ProxyService support for different contest RWS --- AUTHORS.txt | 1 + cms/conf.py | 2 +- cms/service/ProxyService.py | 192 ++++++++++-------- cms/service/ResourceService.py | 13 +- cmstestsuite/programstarter.py | 2 +- cmstestsuite/testrunner.py | 2 +- .../unit_tests/service/ProxyServiceTest.py | 5 +- config/cms.conf.sample | 6 +- scripts/cmsProxyService | 3 +- 9 files changed, 119 insertions(+), 107 deletions(-) diff --git a/AUTHORS.txt b/AUTHORS.txt index aed3712bb..aff772574 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -44,5 +44,6 @@ Kārlis Seņko Peyman Jabbarzade Ganje Valentin Rosca Alexander Kernozhitsky +Pēteris Pakalns And many other people that didn't write code, but provided useful comments, suggestions and feedback. :-) diff --git a/cms/conf.py b/cms/conf.py index 0669bfc1c..5b79a91bb 100644 --- a/cms/conf.py +++ b/cms/conf.py @@ -149,7 +149,7 @@ def __init__(self): self.admin_cookie_duration = 10 * 60 * 60 # 10 hours # ProxyService. - self.rankings = ["http://usern4me:passw0rd@localhost:8890/"] + self.rankings = [[1, "http://usern4me:passw0rd@localhost:8890/"]] self.https_certfile = None # PrintingService diff --git a/cms/service/ProxyService.py b/cms/service/ProxyService.py index 8f16fa464..f9ee7f814 100644 --- a/cms/service/ProxyService.py +++ b/cms/service/ProxyService.py @@ -9,6 +9,7 @@ # Copyright © 2015 Luca Versari # Copyright © 2015 William Di Luigi # Copyright © 2016 Amir Keivan Mohtashami +# Copyright © 2018 Pēteris Pakalns # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -100,16 +101,18 @@ def safe_put_data(ranking, resource, data, operation): class ProxyOperation(QueueItem): - def __init__(self, type_, data): + def __init__(self, contest_id, type_, data): + self.contest_id = contest_id self.type_ = type_ self.data = data def __str__(self): - return "sending data of type %s to ranking" % ( - self.type_) + return "sending data of type %s, contest id %d to ranking" % ( + self.type_, self.contest_id) def to_dict(self): - return {"type": self.type_, + return {"contest_id": self.contest_id, + "type": self.type_, "data": self.data} @@ -154,9 +157,10 @@ class ProxyExecutor(Executor): # before trying again. FAILURE_WAIT = 60.0 - def __init__(self, ranking): + def __init__(self, contest_id, ranking): """Create a proxy for the ranking at the given URL. + contest_id (int): the ID of the contest to manage. ranking (bytes): a complete URL (containing protocol, username, password, hostname, port and prefix) where a ranking is supposed to listen. @@ -164,6 +168,7 @@ def __init__(self, ranking): """ super().__init__(batch_executions=True) + self.contest_id = contest_id self._ranking = ranking def execute(self, entries): @@ -189,6 +194,8 @@ def execute(self, entries): data = list(dict() for i in range(self.TYPE_COUNT)) for entry in entries: + if entry.item.contest_id != self.contest_id: + continue data[entry.item.type_].update(entry.item.data) try: @@ -232,7 +239,7 @@ class ProxyService(TriggeredService): """ - def __init__(self, shard, contest_id): + def __init__(self, shard): """Start the service with the given parameters. Create an instance of the ProxyService and make it listen on @@ -243,13 +250,10 @@ def __init__(self, shard, contest_id): corresponds to the shard-th entry in the list of addresses (hostname/port pairs) for this kind of service in the configuration file. - contest_id (int): the ID of the contest to manage. """ super().__init__(shard) - self.contest_id = contest_id - # Store what data we already sent to rankings, to avoid # sending it twice. self.scores_sent_to_rankings = set() @@ -257,8 +261,10 @@ def __init__(self, shard, contest_id): # Create one executor for each ranking. self.rankings = list() - for ranking in config.rankings: - self.add_executor(ProxyExecutor(ranking)) + self.contest_ids = set() + for contest_id, ranking in config.rankings: + self.add_executor(ProxyExecutor(contest_id, ranking)) + self.contest_ids.add(contest_id) # Enqueue the dispatch of some initial data to rankings. Needs # to be done before the sweeper is started, as otherwise RWS @@ -274,29 +280,29 @@ def _missing_operations(self): """ counter = 0 with SessionGen() as session: - submissions = get_submissions(session, contest_id=self.contest_id) \ - .filter(not_(Participation.hidden)) \ - .filter(Submission.official).all() - - for submission in submissions: - # The submission result can be None if the dataset has - # been just made live. - sr = submission.get_result() - if sr is None: - continue - - if sr.scored() and \ - submission.id not in self.scores_sent_to_rankings: - for operation in self.operations_for_score(submission): - self.enqueue(operation) - counter += 1 - - if submission.tokened() and \ - submission.id not in self.tokens_sent_to_rankings: - for operation in self.operations_for_token(submission): - self.enqueue(operation) - counter += 1 - + for contest_id in self.contest_ids: + submissions = get_submissions(session, contest_id=contest_id) \ + .filter(not_(Participation.hidden)) \ + .filter(Submission.official).all() + + for submission in submissions: + # The submission result can be None if the dataset has + # been just made live. + sr = submission.get_result() + if sr is None: + continue + + if sr.scored() and \ + submission.id not in self.scores_sent_to_rankings: + for operation in self.operations_for_score(submission): + self.enqueue(operation) + counter += 1 + + if submission.tokened() and \ + submission.id not in self.tokens_sent_to_rankings: + for operation in self.operations_for_token(submission): + self.enqueue(operation) + counter += 1 return counter def initialize(self): @@ -312,57 +318,58 @@ def initialize(self): logger.info("Initializing rankings.") with SessionGen() as session: - contest = Contest.get_from_id(self.contest_id, session) - - if contest is None: - logger.error("Received request for unexistent contest " - "id %s.", self.contest_id) - raise KeyError("Contest not found.") - - contest_id = encode_id(contest.name) - contest_data = { - "name": contest.description, - "begin": int(make_timestamp(contest.start)), - "end": int(make_timestamp(contest.stop)), - "score_precision": contest.score_precision} - - users = dict() - teams = dict() - - for participation in contest.participations: - user = participation.user - team = participation.team - if not participation.hidden: - users[encode_id(user.username)] = { - "f_name": user.first_name, - "l_name": user.last_name, - "team": team.code if team is not None else None, - } - if team is not None: - teams[encode_id(team.code)] = { - "name": team.name - } - - tasks = dict() - - for task in contest.tasks: - score_type = task.active_dataset.score_type_object - tasks[encode_id(task.name)] = { - "short_name": task.name, - "name": task.title, - "contest": encode_id(contest.name), - "order": task.num, - "max_score": score_type.max_score, - "extra_headers": score_type.ranking_headers, - "score_precision": task.score_precision, - "score_mode": task.score_mode, + for contest_id in self.contest_ids: + contest = Contest.get_from_id(contest_id, session) + + if contest is None: + logger.error("Received request for unexistent contest " + "id %s.", contest_id) + raise KeyError("Contest not found.") + + contests = {} + contests[encode_id(contest.name)] = { + "name": contest.description, + "begin": int(make_timestamp(contest.start)), + "end": int(make_timestamp(contest.stop)), + "score_precision": contest.score_precision, } - self.enqueue(ProxyOperation(ProxyExecutor.CONTEST_TYPE, - {contest_id: contest_data})) - self.enqueue(ProxyOperation(ProxyExecutor.TEAM_TYPE, teams)) - self.enqueue(ProxyOperation(ProxyExecutor.USER_TYPE, users)) - self.enqueue(ProxyOperation(ProxyExecutor.TASK_TYPE, tasks)) + users = dict() + teams = dict() + + for participation in contest.participations: + user = participation.user + team = participation.team + if not participation.hidden: + users[encode_id(user.username)] = { + "f_name": user.first_name, + "l_name": user.last_name, + "team": team.code if team is not None else None, + } + if team is not None: + teams[encode_id(team.code)] = { + "name": team.name + } + + tasks = dict() + + for task in contest.tasks: + score_type = task.active_dataset.score_type_object + tasks[encode_id(task.name)] = { + "short_name": task.name, + "name": task.title, + "contest": encode_id(contest.name), + "order": task.num, + "max_score": score_type.max_score, + "extra_headers": score_type.ranking_headers, + "score_precision": task.score_precision, + "score_mode": task.score_mode, + } + + self.enqueue(ProxyOperation(contest_id ,ProxyExecutor.CONTEST_TYPE, contests)) + self.enqueue(ProxyOperation(contest_id, ProxyExecutor.TEAM_TYPE, teams)) + self.enqueue(ProxyOperation(contest_id, ProxyExecutor.USER_TYPE, users)) + self.enqueue(ProxyOperation(contest_id, ProxyExecutor.TASK_TYPE, tasks)) def operations_for_score(self, submission): """Send the score for the given submission to all rankings. @@ -371,6 +378,10 @@ def operations_for_score(self, submission): queues for them to be sent to rankings. """ + contest_id = submission.task.contest_id + if contest_id not in self.contest_ids: + return [] + submission_result = submission.get_result() # Data to send to remote rankings. @@ -395,9 +406,9 @@ def operations_for_score(self, submission): self.scores_sent_to_rankings.add(submission.id) return [ - ProxyOperation(ProxyExecutor.SUBMISSION_TYPE, + ProxyOperation(contest_id, ProxyExecutor.SUBMISSION_TYPE, {submission_id: submission_data}), - ProxyOperation(ProxyExecutor.SUBCHANGE_TYPE, + ProxyOperation(contest_id, ProxyExecutor.SUBCHANGE_TYPE, {subchange_id: subchange_data})] def operations_for_token(self, submission): @@ -407,6 +418,10 @@ def operations_for_token(self, submission): queues for them to be sent to rankings. """ + contest_id = submission.task.contest_id + if contest_id not in self.contest_ids: + return [] + # Data to send to remote rankings. submission_id = "%d" % submission.id submission_data = { @@ -424,9 +439,9 @@ def operations_for_token(self, submission): self.tokens_sent_to_rankings.add(submission.id) return [ - ProxyOperation(ProxyExecutor.SUBMISSION_TYPE, + ProxyOperation(contest_id, ProxyExecutor.SUBMISSION_TYPE, {submission_id: submission_data}), - ProxyOperation(ProxyExecutor.SUBCHANGE_TYPE, + ProxyOperation(contest_id, ProxyExecutor.SUBCHANGE_TYPE, {subchange_id: subchange_data})] @rpc_method @@ -536,6 +551,9 @@ def dataset_updated(self, task_id): # max_score and/or extra_headers might have changed. self.reinitialize() + if task.contest_id not in self.contest_ids: + return + for submission in task.submissions: # Update RWS. if not submission.participation.hidden and \ diff --git a/cms/service/ResourceService.py b/cms/service/ResourceService.py index 47a981f47..da79da29f 100644 --- a/cms/service/ResourceService.py +++ b/cms/service/ResourceService.py @@ -179,10 +179,6 @@ def __init__(self, shard, contest_id=None, autorestart=False): self._prev_cpu_times = self._get_cpu_times() # Sorted list of ServiceCoord running in the same machine self._local_services = self._find_local_services() - if "ProxyService" in (s.name for s in self._local_services) and \ - self.contest_id is None: - logger.warning("Will not run ProxyService " - "since it requires a contest id.") # Dict service with bool to mark if we will restart them. self._will_restart = dict((service, None if not self.autorestart else True) @@ -222,10 +218,7 @@ def _restart_services(self): matcher = ProcessMatcher() for service in self._local_services: # We let the user start logservice and resourceservice. - if service.name == "LogService" or \ - service.name == "ResourceService" or \ - (self.contest_id is None and - service.name == "ProxyService"): + if service.name in ("LogService", "ResourceService"): continue # If the user specified not to restart some service, we @@ -463,10 +456,6 @@ def toggle_autorestart(self, service): logger.error("Unable to decode service string.") name = service[:idx] - # ProxyService requires contest_id - if self.contest_id is None and name == "ProxyService": - return None - try: shard = int(service[idx + 1:]) except ValueError: diff --git a/cmstestsuite/programstarter.py b/cmstestsuite/programstarter.py index 47dcf773c..bd691cdec 100644 --- a/cmstestsuite/programstarter.py +++ b/cmstestsuite/programstarter.py @@ -244,7 +244,7 @@ def _check_service(self): def _check_ranking_web_server(self): """Health checker for RWS.""" - url = urlsplit(self.cms_config["rankings"][0]) + url = urlsplit(self.cms_config["rankings"][0][1]) sock = socket.socket() sock.connect((url.hostname, url.port)) sock.close() diff --git a/cmstestsuite/testrunner.py b/cmstestsuite/testrunner.py index dcc4c2a09..6175c5c59 100644 --- a/cmstestsuite/testrunner.py +++ b/cmstestsuite/testrunner.py @@ -299,7 +299,7 @@ def submit_tests(self, concurrent_submit_and_eval=True): self.ps.start("EvaluationService", contest=self.contest_id) self.ps.wait() - self.ps.start("ProxyService", contest=self.contest_id) + self.ps.start("ProxyService") for shard in range(self.workers): self.ps.start("Worker", shard) diff --git a/cmstestsuite/unit_tests/service/ProxyServiceTest.py b/cmstestsuite/unit_tests/service/ProxyServiceTest.py index 246732e9e..91c50731a 100755 --- a/cmstestsuite/unit_tests/service/ProxyServiceTest.py +++ b/cmstestsuite/unit_tests/service/ProxyServiceTest.py @@ -33,6 +33,7 @@ # Needs to be first to allow for monkey patching the DB connection string. from cmstestsuite.unit_tests.databasemixin import DatabaseMixin +from cms.conf import config from cms.service.ProxyService import ProxyService from cmscommon.constants import SCORE_MODE_MAX @@ -76,6 +77,8 @@ def setUp(self): self.session.commit() + config.rankings[0][0] = self.contest.id + def new_sr_unscored(self): submission = self.add_submission(task=self.task, participation=self.participation) @@ -96,7 +99,7 @@ def new_sr_scored(self): def test_startup(self): """Test that data is sent in the right order at startup.""" - ProxyService(0, self.contest.id) + ProxyService(0) gevent.sleep(0.1) diff --git a/config/cms.conf.sample b/config/cms.conf.sample index a3c6770fd..9894e3dab 100644 --- a/config/cms.conf.sample +++ b/config/cms.conf.sample @@ -151,12 +151,14 @@ "_section": "ScoringService", - "_help": "List of URLs (with embedded username and password) of the", + "_help": "List of [contest id, URL] tuples.", + "_help": "Contest id specifies which contest scores to send.", + "_help": "URL (with embedded username and password) of the", "_help": "RWSs where the scores are to be sent. Don't include the", "_help": "load balancing proxy (if any), just the backends. If any", "_help": "of them uses HTTPS specify a file with the certificates", "_help": "you trust.", - "rankings": ["http://usern4me:passw0rd@localhost:8890/"], + "rankings": [[1, "http://usern4me:passw0rd@localhost:8890/"]], "https_certfile": null, diff --git a/scripts/cmsProxyService b/scripts/cmsProxyService index b7953873d..662e707e7 100755 --- a/scripts/cmsProxyService +++ b/scripts/cmsProxyService @@ -39,8 +39,7 @@ def main(): """ test_db_connection() success = default_argument_parser("Ranking relayer for CMS.", - ProxyService, - ask_contest=ask_for_contest).run() + ProxyService).run() return 0 if success is True else 1 From f311d0a0e9438763ee57e0ffbb9942bf63b41f68 Mon Sep 17 00:00:00 2001 From: PPakalns Date: Tue, 22 Jan 2019 10:06:42 +0200 Subject: [PATCH 2/2] Fix line length in ProxyService.py --- cms/service/ProxyService.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/cms/service/ProxyService.py b/cms/service/ProxyService.py index f9ee7f814..de6c605e6 100644 --- a/cms/service/ProxyService.py +++ b/cms/service/ProxyService.py @@ -366,10 +366,18 @@ def initialize(self): "score_mode": task.score_mode, } - self.enqueue(ProxyOperation(contest_id ,ProxyExecutor.CONTEST_TYPE, contests)) - self.enqueue(ProxyOperation(contest_id, ProxyExecutor.TEAM_TYPE, teams)) - self.enqueue(ProxyOperation(contest_id, ProxyExecutor.USER_TYPE, users)) - self.enqueue(ProxyOperation(contest_id, ProxyExecutor.TASK_TYPE, tasks)) + self.enqueue(ProxyOperation(contest_id, + ProxyExecutor.CONTEST_TYPE, + contests)) + self.enqueue(ProxyOperation(contest_id, + ProxyExecutor.TEAM_TYPE, + teams)) + self.enqueue(ProxyOperation(contest_id, + ProxyExecutor.USER_TYPE, + users)) + self.enqueue(ProxyOperation(contest_id, + ProxyExecutor.TASK_TYPE, + tasks)) def operations_for_score(self, submission): """Send the score for the given submission to all rankings.