Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProxyService support for different contest RWS #1102

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ Kārlis Seņko <[email protected]>
Peyman Jabbarzade Ganje <[email protected]>
Valentin Rosca <[email protected]>
Alexander Kernozhitsky <[email protected]>
Pēteris Pakalns <[email protected]>
And many other people that didn't write code, but provided useful
comments, suggestions and feedback. :-)
2 changes: 1 addition & 1 deletion cms/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def __init__(self):
self.admin_cookie_duration = 10 * 60 * 60 # 10 hours

# ProxyService.
self.rankings = ["http://usern4me:passw0rd@localhost:8890/"]
self.rankings = [[1, "http://usern4me:passw0rd@localhost:8890/"]]
self.https_certfile = None

# PrintingService
Expand Down
200 changes: 113 additions & 87 deletions cms/service/ProxyService.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# Copyright © 2015 Luca Versari <[email protected]>
# Copyright © 2015 William Di Luigi <[email protected]>
# Copyright © 2016 Amir Keivan Mohtashami <[email protected]>
# Copyright © 2018 Pēteris Pakalns <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -100,16 +101,18 @@ def safe_put_data(ranking, resource, data, operation):


class ProxyOperation(QueueItem):
def __init__(self, type_, data):
def __init__(self, contest_id, type_, data):
self.contest_id = contest_id
self.type_ = type_
self.data = data

def __str__(self):
return "sending data of type %s to ranking" % (
self.type_)
return "sending data of type %s, contest id %d to ranking" % (
self.type_, self.contest_id)

def to_dict(self):
return {"type": self.type_,
return {"contest_id": self.contest_id,
"type": self.type_,
"data": self.data}


Expand Down Expand Up @@ -154,16 +157,18 @@ class ProxyExecutor(Executor):
# before trying again.
FAILURE_WAIT = 60.0

def __init__(self, ranking):
def __init__(self, contest_id, ranking):
"""Create a proxy for the ranking at the given URL.

contest_id (int): the ID of the contest to manage.
ranking (bytes): a complete URL (containing protocol, username,
password, hostname, port and prefix) where a ranking is
supposed to listen.

"""
super().__init__(batch_executions=True)

self.contest_id = contest_id
self._ranking = ranking

def execute(self, entries):
Expand All @@ -189,6 +194,8 @@ def execute(self, entries):
data = list(dict() for i in range(self.TYPE_COUNT))

for entry in entries:
if entry.item.contest_id != self.contest_id:
continue
data[entry.item.type_].update(entry.item.data)

try:
Expand Down Expand Up @@ -232,7 +239,7 @@ class ProxyService(TriggeredService):

"""

def __init__(self, shard, contest_id):
def __init__(self, shard):
"""Start the service with the given parameters.

Create an instance of the ProxyService and make it listen on
Expand All @@ -243,22 +250,21 @@ def __init__(self, shard, contest_id):
corresponds to the shard-th entry in the list of addresses
(hostname/port pairs) for this kind of service in the
configuration file.
contest_id (int): the ID of the contest to manage.

"""
super().__init__(shard)

self.contest_id = contest_id

# Store what data we already sent to rankings, to avoid
# sending it twice.
self.scores_sent_to_rankings = set()
self.tokens_sent_to_rankings = set()

# Create one executor for each ranking.
self.rankings = list()
for ranking in config.rankings:
self.add_executor(ProxyExecutor(ranking))
self.contest_ids = set()
for contest_id, ranking in config.rankings:
self.add_executor(ProxyExecutor(contest_id, ranking))
self.contest_ids.add(contest_id)

# Enqueue the dispatch of some initial data to rankings. Needs
# to be done before the sweeper is started, as otherwise RWS
Expand All @@ -274,29 +280,29 @@ def _missing_operations(self):
"""
counter = 0
with SessionGen() as session:
submissions = get_submissions(session, contest_id=self.contest_id) \
.filter(not_(Participation.hidden)) \
.filter(Submission.official).all()

for submission in submissions:
# The submission result can be None if the dataset has
# been just made live.
sr = submission.get_result()
if sr is None:
continue

if sr.scored() and \
submission.id not in self.scores_sent_to_rankings:
for operation in self.operations_for_score(submission):
self.enqueue(operation)
counter += 1

if submission.tokened() and \
submission.id not in self.tokens_sent_to_rankings:
for operation in self.operations_for_token(submission):
self.enqueue(operation)
counter += 1

for contest_id in self.contest_ids:
submissions = get_submissions(session, contest_id=contest_id) \
.filter(not_(Participation.hidden)) \
.filter(Submission.official).all()

for submission in submissions:
# The submission result can be None if the dataset has
# been just made live.
sr = submission.get_result()
if sr is None:
continue

if sr.scored() and \
submission.id not in self.scores_sent_to_rankings:
for operation in self.operations_for_score(submission):
self.enqueue(operation)
counter += 1

if submission.tokened() and \
submission.id not in self.tokens_sent_to_rankings:
for operation in self.operations_for_token(submission):
self.enqueue(operation)
counter += 1
return counter

def initialize(self):
Expand All @@ -312,57 +318,66 @@ def initialize(self):
logger.info("Initializing rankings.")

with SessionGen() as session:
contest = Contest.get_from_id(self.contest_id, session)

if contest is None:
logger.error("Received request for unexistent contest "
"id %s.", self.contest_id)
raise KeyError("Contest not found.")

contest_id = encode_id(contest.name)
contest_data = {
"name": contest.description,
"begin": int(make_timestamp(contest.start)),
"end": int(make_timestamp(contest.stop)),
"score_precision": contest.score_precision}

users = dict()
teams = dict()

for participation in contest.participations:
user = participation.user
team = participation.team
if not participation.hidden:
users[encode_id(user.username)] = {
"f_name": user.first_name,
"l_name": user.last_name,
"team": team.code if team is not None else None,
}
if team is not None:
teams[encode_id(team.code)] = {
"name": team.name
}

tasks = dict()

for task in contest.tasks:
score_type = task.active_dataset.score_type_object
tasks[encode_id(task.name)] = {
"short_name": task.name,
"name": task.title,
"contest": encode_id(contest.name),
"order": task.num,
"max_score": score_type.max_score,
"extra_headers": score_type.ranking_headers,
"score_precision": task.score_precision,
"score_mode": task.score_mode,
for contest_id in self.contest_ids:
contest = Contest.get_from_id(contest_id, session)

if contest is None:
logger.error("Received request for unexistent contest "
"id %s.", contest_id)
raise KeyError("Contest not found.")

contests = {}
contests[encode_id(contest.name)] = {
"name": contest.description,
"begin": int(make_timestamp(contest.start)),
"end": int(make_timestamp(contest.stop)),
"score_precision": contest.score_precision,
}

self.enqueue(ProxyOperation(ProxyExecutor.CONTEST_TYPE,
{contest_id: contest_data}))
self.enqueue(ProxyOperation(ProxyExecutor.TEAM_TYPE, teams))
self.enqueue(ProxyOperation(ProxyExecutor.USER_TYPE, users))
self.enqueue(ProxyOperation(ProxyExecutor.TASK_TYPE, tasks))
users = dict()
teams = dict()

for participation in contest.participations:
user = participation.user
team = participation.team
if not participation.hidden:
users[encode_id(user.username)] = {
"f_name": user.first_name,
"l_name": user.last_name,
"team": team.code if team is not None else None,
}
if team is not None:
teams[encode_id(team.code)] = {
"name": team.name
}

tasks = dict()

for task in contest.tasks:
score_type = task.active_dataset.score_type_object
tasks[encode_id(task.name)] = {
"short_name": task.name,
"name": task.title,
"contest": encode_id(contest.name),
"order": task.num,
"max_score": score_type.max_score,
"extra_headers": score_type.ranking_headers,
"score_precision": task.score_precision,
"score_mode": task.score_mode,
}

self.enqueue(ProxyOperation(contest_id,
ProxyExecutor.CONTEST_TYPE,
contests))
self.enqueue(ProxyOperation(contest_id,
ProxyExecutor.TEAM_TYPE,
teams))
self.enqueue(ProxyOperation(contest_id,
ProxyExecutor.USER_TYPE,
users))
self.enqueue(ProxyOperation(contest_id,
ProxyExecutor.TASK_TYPE,
tasks))

def operations_for_score(self, submission):
"""Send the score for the given submission to all rankings.
Expand All @@ -371,6 +386,10 @@ def operations_for_score(self, submission):
queues for them to be sent to rankings.

"""
contest_id = submission.task.contest_id
if contest_id not in self.contest_ids:
return []

submission_result = submission.get_result()

# Data to send to remote rankings.
Expand All @@ -395,9 +414,9 @@ def operations_for_score(self, submission):
self.scores_sent_to_rankings.add(submission.id)

return [
ProxyOperation(ProxyExecutor.SUBMISSION_TYPE,
ProxyOperation(contest_id, ProxyExecutor.SUBMISSION_TYPE,
{submission_id: submission_data}),
ProxyOperation(ProxyExecutor.SUBCHANGE_TYPE,
ProxyOperation(contest_id, ProxyExecutor.SUBCHANGE_TYPE,
{subchange_id: subchange_data})]

def operations_for_token(self, submission):
Expand All @@ -407,6 +426,10 @@ def operations_for_token(self, submission):
queues for them to be sent to rankings.

"""
contest_id = submission.task.contest_id
if contest_id not in self.contest_ids:
return []

# Data to send to remote rankings.
submission_id = "%d" % submission.id
submission_data = {
Expand All @@ -424,9 +447,9 @@ def operations_for_token(self, submission):
self.tokens_sent_to_rankings.add(submission.id)

return [
ProxyOperation(ProxyExecutor.SUBMISSION_TYPE,
ProxyOperation(contest_id, ProxyExecutor.SUBMISSION_TYPE,
{submission_id: submission_data}),
ProxyOperation(ProxyExecutor.SUBCHANGE_TYPE,
ProxyOperation(contest_id, ProxyExecutor.SUBCHANGE_TYPE,
{subchange_id: subchange_data})]

@rpc_method
Expand Down Expand Up @@ -536,6 +559,9 @@ def dataset_updated(self, task_id):
# max_score and/or extra_headers might have changed.
self.reinitialize()

if task.contest_id not in self.contest_ids:
return

for submission in task.submissions:
# Update RWS.
if not submission.participation.hidden and \
Expand Down
13 changes: 1 addition & 12 deletions cms/service/ResourceService.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ def __init__(self, shard, contest_id=None, autorestart=False):
self._prev_cpu_times = self._get_cpu_times()
# Sorted list of ServiceCoord running in the same machine
self._local_services = self._find_local_services()
if "ProxyService" in (s.name for s in self._local_services) and \
self.contest_id is None:
logger.warning("Will not run ProxyService "
"since it requires a contest id.")
# Dict service with bool to mark if we will restart them.
self._will_restart = dict((service,
None if not self.autorestart else True)
Expand Down Expand Up @@ -222,10 +218,7 @@ def _restart_services(self):
matcher = ProcessMatcher()
for service in self._local_services:
# We let the user start logservice and resourceservice.
if service.name == "LogService" or \
service.name == "ResourceService" or \
(self.contest_id is None and
service.name == "ProxyService"):
if service.name in ("LogService", "ResourceService"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be more conventional to use a list here.

continue

# If the user specified not to restart some service, we
Expand Down Expand Up @@ -463,10 +456,6 @@ def toggle_autorestart(self, service):
logger.error("Unable to decode service string.")
name = service[:idx]

# ProxyService requires contest_id
if self.contest_id is None and name == "ProxyService":
return None

try:
shard = int(service[idx + 1:])
except ValueError:
Expand Down
2 changes: 1 addition & 1 deletion cmstestsuite/programstarter.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def _check_service(self):

def _check_ranking_web_server(self):
"""Health checker for RWS."""
url = urlsplit(self.cms_config["rankings"][0])
url = urlsplit(self.cms_config["rankings"][0][1])
sock = socket.socket()
sock.connect((url.hostname, url.port))
sock.close()
Expand Down
2 changes: 1 addition & 1 deletion cmstestsuite/testrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def submit_tests(self, concurrent_submit_and_eval=True):
self.ps.start("EvaluationService", contest=self.contest_id)
self.ps.wait()

self.ps.start("ProxyService", contest=self.contest_id)
self.ps.start("ProxyService")
for shard in range(self.workers):
self.ps.start("Worker", shard)

Expand Down
Loading