Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(celery): stop closing prerun_span too soon to account for Celery chains scenario [backport 2.19] #12001

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
Expand All @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
3 changes: 0 additions & 3 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing.
5 changes: 5 additions & 0 deletions tests/contrib/celery/run_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from tasks import fn_a
from tasks import fn_b


(fn_a.si() | fn_b.si()).delay()
14 changes: 14 additions & 0 deletions tests/contrib/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from celery import Celery


app = Celery("tasks")


@app.task(name="tests.contrib.celery.tasks.fn_a")
def fn_a():
return "a"


@app.task(name="tests.contrib.celery.tasks.fn_b")
def fn_b():
return "b"
62 changes: 62 additions & 0 deletions tests/contrib/celery/test_chained_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import re
import subprocess
import time

from celery import Celery


# Ensure that when we call Celery chains, the root span has celery specific span tags
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running.
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results.
# See https://github.com/DataDog/dd-trace-py/issues/11479
def test_task_chain_task_call_task():
app = Celery("tasks")

celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo"
celery_task_runner_cmd = "ddtrace-run python run_tasks.py"

# The commands need to run from the directory where this test file lives
current_directory = str(os.path.dirname(__file__))

worker_process = subprocess.Popen(
celery_worker_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

max_wait_time = 10
waited_so_far = 0
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running}
while app.control.inspect().active() is None and waited_so_far < max_wait_time:
time.sleep(1)
waited_so_far += 1

# The task should only run after the Celery worker has sufficient time to start up
task_runner_process = subprocess.Popen(
celery_task_runner_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

task_runner_process.wait()
# Kill the process so it starts to send traces to the Trace Agent
worker_process.kill()
worker_logs = worker_process.stderr.read()

# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id
# Some versions of python seem to require escaping when using `re.search`:
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*"
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*"

pattern_exists = (
re.search(old_pattern_match, str(worker_logs)) is not None
or re.search(new_pattern_match, str(worker_logs)) is not None
)
assert pattern_exists is not None
Loading