Skip to content

Commit

Permalink
Added API to add child spans to a span
Browse files Browse the repository at this point in the history
Signed-off-by: Eran Ifrah <[email protected]>
  • Loading branch information
eifrah-aws committed Jan 1, 2025
1 parent f5123e9 commit 57a3a17
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 5 deletions.
94 changes: 90 additions & 4 deletions glide-core/telemetry/src/open_telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use opentelemetry::global::ObjectSafeSpan;
use opentelemetry::trace::SpanKind;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::{global, trace::Tracer};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::TracerProvider;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

const SPAN_WRITE_LOCK_ERR: &str = "Failed to get span write lock";
const SPAN_READ_LOCK_ERR: &str = "Failed to get span read lock";
const TRACE_SCOPE: &str = "valkey_glide";

pub enum GlideSpanStatus {
Expand All @@ -16,7 +18,9 @@ pub enum GlideSpanStatus {

#[allow(dead_code)]
#[derive(Clone, Debug)]
/// Defines the tracing exporter between GLIDE -> collector
/// Defines the method that exporter connects to the collector. It can be:
/// gRPC or HTTP. The third type (i.e. "File") defines an exporter that does not connect to a collector
/// instead, it writes the collected signals to files.
pub enum GlideOpenTelemetryTraceExporter {
/// Collector is listening on grpc
Grpc(String),
Expand All @@ -33,6 +37,7 @@ struct GlideSpanInner {
}

impl GlideSpanInner {
/// Create new span with no parent.
pub fn new(name: &str) -> Self {
let tracer = global::tracer(TRACE_SCOPE);
let span = Arc::new(RwLock::new(
Expand All @@ -44,6 +49,28 @@ impl GlideSpanInner {
GlideSpanInner { span }
}

/// Create new span as a child of `parent`.
pub fn new_with_parent(name: &str, parent: &GlideSpanInner) -> Self {
let parent_span_ctx = parent
.span
.read()
.expect(SPAN_READ_LOCK_ERR)
.span_context()
.clone();

let parent_context =
opentelemetry::Context::new().with_remote_span_context(parent_span_ctx);

let tracer = global::tracer(TRACE_SCOPE);
let span = Arc::new(RwLock::new(
tracer
.span_builder(name.to_string())
.with_kind(SpanKind::Client)
.start_with_context(&tracer, &parent_context),
));
GlideSpanInner { span }
}

/// Attach event with name and list of attributes to this span.
pub fn add_event(&self, name: &str, attributes: Option<&Vec<(&str, &str)>>) {
let attributes: Vec<opentelemetry::KeyValue> = if let Some(attributes) = attributes {
Expand Down Expand Up @@ -80,6 +107,29 @@ impl GlideSpanInner {
}
}
}

/// Create new span, add it as a child to this span and return it
pub fn add_span(&self, name: &str) -> GlideSpanInner {
let child = GlideSpanInner::new_with_parent(name, self);
{
let child_span = child.span.read().expect(SPAN_WRITE_LOCK_ERR);
self.span
.write()
.expect(SPAN_WRITE_LOCK_ERR)
.add_link(child_span.span_context().clone(), Vec::default());
}
child
}

/// Return the span ID
pub fn id(&self) -> String {
self.span
.read()
.expect(SPAN_READ_LOCK_ERR)
.span_context()
.span_id()
.to_string()
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -107,6 +157,17 @@ impl GlideSpan {
pub fn set_status(&self, status: GlideSpanStatus) {
self.inner.set_status(status)
}

/// Add child span to this span and return it
pub fn add_span(&self, name: &str) -> GlideSpan {
GlideSpan {
inner: self.inner.add_span(name),
}
}

pub fn id(&self) -> String {
self.inner.id()
}
}

/// OpenTelemetry configuration object. Use `GlideOpenTelemetryConfigBuilder` to construct it:
Expand Down Expand Up @@ -210,31 +271,56 @@ impl GlideOpenTelemetry {
#[cfg(test)]
mod tests {
use super::*;

const SPANS_JSON: &str = "/tmp/spans.json";
#[test]
fn test_span_json_exporter() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let _ = std::fs::remove_file(SPANS_JSON);
let config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(100))
.with_trace_exporter(GlideOpenTelemetryTraceExporter::File(PathBuf::from("/tmp")))
.build();
GlideOpenTelemetry::initialise(config);
let span = GlideOpenTelemetry::new_span("span_with_1_event");
let span = GlideOpenTelemetry::new_span("Root_Span_1");
span.add_event("Event1");
span.set_status(GlideSpanStatus::Ok);

let child1 = span.add_span("Network_Span");

// keep the ids for testing purposes
drop(child1); // close the child span
drop(span); // writes the span
let span = GlideOpenTelemetry::new_span("span_with_2_events");

let span = GlideOpenTelemetry::new_span("Root_Span_2");
span.add_event("Event1");
span.add_event("Event2");
span.set_status(GlideSpanStatus::Ok);
drop(span); // writes the span

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

// Read the file content
let file_content = std::fs::read_to_string(SPANS_JSON).unwrap();
let lines: Vec<&str> = file_content.split('\n').collect();
assert_eq!(lines.len(), 4);

let span_json: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(span_json["name"], "Network_Span");
let network_span_id = span_json["span_id"].to_string();

let span_json: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
assert_eq!(span_json["name"], "Root_Span_1");
assert_eq!(span_json["links"].as_array().unwrap().len(), 1); // we expect 1 child

let child_span_id = span_json["links"][0]["span_id"].to_string();
assert_eq!(child_span_id, network_span_id);

let span_json: serde_json::Value = serde_json::from_str(lines[2]).unwrap();
assert_eq!(span_json["name"], "Root_Span_2");
});
}
}
17 changes: 16 additions & 1 deletion glide-core/telemetry/src/open_telemetry_exporter_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporterFile {

let spans = to_jsons(batch);
for span in &spans {
if let Ok(s) = serde_json::to_string_pretty(&span) {
if let Ok(s) = serde_json::to_string(&span) {
file_writeln!(data_file, s);
}
}
Expand Down Expand Up @@ -173,6 +173,21 @@ fn to_jsons(batch: Vec<export::trace::SpanData>) -> Vec<Value> {
events.push(Value::Object(evt));
}
map.insert("events".to_string(), Value::Array(events));

let mut links = Vec::<Value>::new();
for link in span.links.iter() {
let mut lk = Map::new();
lk.insert(
"trace_id".to_string(),
Value::String(link.span_context.trace_id().to_string()),
);
lk.insert(
"span_id".to_string(),
Value::String(link.span_context.span_id().to_string()),
);
links.push(Value::Object(lk));
}
map.insert("links".to_string(), Value::Array(links));
spans.push(Value::Object(map));
}
spans
Expand Down

0 comments on commit 57a3a17

Please sign in to comment.