Skip to content

Commit

Permalink
otel init
Browse files Browse the repository at this point in the history
  • Loading branch information
Sid Mohan authored and Sid Mohan committed May 28, 2024
1 parent 4aeb2f0 commit 2facea8
Show file tree
Hide file tree
Showing 438 changed files with 24,467 additions and 30,991 deletions.
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
APPLICATIONINSIGHTS_CONNECTION_STRING="InstrumentationKey=00bea047-1836-46fa-9652-26d43d63a3fa;IngestionEndpoint=https://eastus-8.in.applicationinsights.azure.com/;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/;ApplicationId=959cc365-c112-491b-af69-b196d0943ca4"


# note this is an Azure specific implementation of the OpenTelemetry distro. for more information please see https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry
85 changes: 69 additions & 16 deletions datafog/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,34 @@
from .services.image_service import ImageService
from .services.spark_service import SparkService
from .services.text_service import TextService
from .telemetry.open_telemetry import Telemetry

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
from azure.monitor.opentelemetry import configure_azure_monitor
import platform
from opentelemetry.trace import Status, StatusCode

# Use environment variable if available, otherwise fall back to hardcoded value
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from logging import INFO, getLogger
from dotenv import load_dotenv
import logging

load_dotenv() # Load environment variables from .env file
APPLICATIONINSIGHTS_CONNECTION_STRING = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
configure_azure_monitor(connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING)
trace.set_tracer_provider(TracerProvider())
exporter = AzureMonitorTraceExporter(connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
logger = logging.getLogger("datafog_logger")
logger.setLevel(INFO)

class DataFog:
def __init__(
Expand All @@ -28,25 +54,52 @@ def __init__(
self.text_service = text_service
self.spark_service: SparkService = spark_service
self.operations: List[OperationType] = operations
self.telemetry = Telemetry()
self.telemetry.set_tracer()
self.logger = logging.getLogger(__name__)
self.logger.info("Initializing DataFog class with the following services and operations:")
self.logger.info(f"Image Service: {type(image_service)}")
self.logger.info(f"Text Service: {type(text_service)}")
self.logger.info(f"Spark Service: {type(spark_service) if spark_service else 'None'}")
self.logger.info(f"Operations: {operations}")
self.tracer = trace.get_tracer(__name__)

async def run_ocr_pipeline(self, image_urls: List[str]):
"""Run the OCR pipeline asynchronously."""
extracted_text = await self.image_service.ocr_extract(image_urls)
if OperationType.ANNOTATE_PII in self.operations:
annotated_text = await self.text_service.batch_annotate_texts(
extracted_text
)
return annotated_text
return extracted_text

with self.tracer.start_as_current_span("run_ocr_pipeline") as span:
try:
extracted_text = await self.image_service.ocr_extract(image_urls)
self.logger.info(f"OCR extraction completed for {len(image_urls)} images.")
self.logger.debug(f"Total length of extracted text: {sum(len(text) for text in extracted_text)}")

if OperationType.ANNOTATE_PII in self.operations:
annotated_text = await self.text_service.batch_annotate_texts(extracted_text)
self.logger.info(f"Text annotation completed with {len(annotated_text)} annotations.")
return annotated_text

return extracted_text
except Exception as e:
self.logger.error(f"Error in run_ocr_pipeline: {str(e)}")
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
async def run_text_pipeline(self, texts: List[str]):
"""Run the text pipeline asynchronously."""
if OperationType.ANNOTATE_PII in self.operations:
annotated_text = await self.text_service.batch_annotate_texts(texts)
return annotated_text
return texts
with self.tracer.start_as_current_span("run_text_pipeline") as span:
try:
self.logger.info(f"Starting text pipeline with {len(texts)} texts.")
if OperationType.ANNOTATE_PII in self.operations:
annotated_text = await self.text_service.batch_annotate_texts(texts)
self.logger.info(f"Text annotation completed with {len(annotated_text)} annotations.")
return annotated_text

self.logger.info("No annotation operation found; returning original texts.")
return texts
except Exception as e:
self.logger.error(f"Error in run_text_pipeline: {str(e)}")
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
def _add_attributes(self, span, attributes: dict):
"""Add multiple attributes to a span."""
for key, value in attributes.items():
span.set_attribute(key, value)


class OCRPIIAnnotator:
Expand Down
131 changes: 45 additions & 86 deletions datafog/telemetry/open_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,72 @@
import asyncio
import json
import logging
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
from azure.monitor.opentelemetry import configure_azure_monitor
import platform
from typing import Any
from opentelemetry.trace import Status, StatusCode

import pkg_resources
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
# Use environment variable if available, otherwise fall back to hardcoded value
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode
from logging import INFO, getLogger
from dotenv import load_dotenv
from azure.monitor.opentelemetry import configure_azure_monitor
load_dotenv()

APPLICATIONINSIGHTS_CONNECTION_STRING = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")

class Telemetry:
"""A class to handle anonymous telemetry for the DataFog package."""

def __init__(self, instrumentation_key: str = os.getenv("INSTRUMENTATION_KEY")):
def __init__(self):
self.ready = False
self.trace_set = False
try:
self.resource = Resource(attributes={SERVICE_NAME: "datafog-python"})
self.provider = TracerProvider(resource=self.resource)
# Create a new TracerProvider and set it as the global trace provider
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Configure Azure Monitor with the connection string from environment variables
configure_azure_monitor(connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING, logger_name="datafog_logger")

# Create an exporter that sends data to Application Insights
exporter = AzureMonitorTraceExporter(
connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING
)
processor = BatchSpanProcessor(exporter)
self.provider.add_span_processor(processor)

# Create a span processor and add it to the tracer provider
span_processor = BatchSpanProcessor(exporter)
tracer_provider.add_span_processor(span_processor)

# Get a tracer
self.tracer = trace.get_tracer(__name__)

self.ready = True
except BaseException as e:
if isinstance(
e,
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
):
raise
self.ready = False
self.trace_set = True

def set_tracer(self):
"""Sets the tracer for telemetry."""
if self.ready:
try:
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception:
self.trace_set = False
except Exception as e:
print(f"Error setting up Azure Monitor: {e}")

def log_system_info(self):
"""Logs system information."""
if self.ready:
try:
tracer = trace.get_tracer("datafog.telemetry")
with tracer.start_as_current_span("System Info") as span:
self._add_attribute(
span,
"datafog_version",
pkg_resources.get_distribution("datafog").version,
)
self._add_attribute(
span, "python_version", platform.python_version()
)
self._add_attribute(span, "os", platform.system())
self._add_attribute(span, "platform_version", platform.version())
self._add_attribute(span, "cpus", os.cpu_count())
span.set_status(Status(StatusCode.OK))
except Exception:
pass

def pipeline_execution(self, datafog, input_data, output_data):
"""Records the execution of a DataFog pipeline."""
if self.ready:
try:
tracer = trace.get_tracer("datafog.telemetry")
with tracer.start_as_current_span("Pipeline Execution") as span:
self._add_attribute(
span,
"datafog_version",
pkg_resources.get_distribution("datafog").version,
)
self._add_attribute(
span, "pipeline_type", datafog.__class__.__name__
)
self._add_attribute(span, "input_data", input_data)
self._add_attribute(span, "output_data", output_data)
span.set_status(Status(StatusCode.OK))
except Exception:
pass

def end_pipeline(self, datafog, output):
def datafog_creation(self, name: str):
if self.ready:
try:
tracer = trace.get_tracer("datafog.telemetry")
with tracer.start_as_current_span("Pipeline Ended") as span:
self._add_attribute(
span,
"datafog_version",
pkg_resources.get_distribution("datafog").version,
)
self._add_attribute(
span, "pipeline_type", datafog.__class__.__name__
)
self._add_attribute(span, "output", output)
span.set_status(Status(StatusCode.OK))
except Exception:
pass
tracer = trace.get_tracer(__name__)
span = tracer.start_span("datafog object created")
self._add_attribute(span, "datafog_name", name)
self._add_attribute(span, "datafog_version", platform.python_version())
span.set_status(Status(StatusCode.OK))
span.end()
except Exception as e:
print(f"Error starting span: {e}")
return None

def _add_attribute(self, span, key, value):
"""Add an attribute to a span."""
try:
span.set_attribute(key, value)
return span.set_attribute(key, value)
except Exception:
pass

1 change: 1 addition & 0 deletions env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
APPLICATIONINSIGHTS_CONNECTION_STRING="" # note this is an Azure specific implementation of the OpenTelemetry distro. for more information please see https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ spacy==3.4.4
# pyspark==3.4.1
pytest==8.0.2
Requests==2.31.0
setuptools==58.1.0
setuptools==70.0.0
pydantic==1.10.15
fastapi
pandas
Expand All @@ -18,5 +18,7 @@ asyncio
aiohttp
pytest-asyncio
azure-monitor-opentelemetry-exporter==1.0.0b25
azure-monitor-opentelemetry
opentelemetry-sdk
en_spacy_pii_fast==0.0.0
python-dotenv
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


def __version__():
return "3.2.1b3"
return "3.2.1b9"


project_urls = {
Expand Down Expand Up @@ -42,6 +42,10 @@ def __version__():
"pytesseract",
"aiohttp",
"pytest-asyncio",
"python-dotenv",
"azure-monitor-opentelemetry-exporter==1.0.0b25",
"opentelemetry-sdk",
"azure-monitor-opentelemetry"
],
python_requires=">=3.10",
classifiers=[
Expand Down
6 changes: 2 additions & 4 deletions tests/.datafog_env/bin/pip
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
# -*- coding: utf-8 -*-
import re
import sys

from pip._internal.cli.main import main

if __name__ == "__main__":
sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0])
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
6 changes: 2 additions & 4 deletions tests/.datafog_env/bin/pip3
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
# -*- coding: utf-8 -*-
import re
import sys

from pip._internal.cli.main import main

if __name__ == "__main__":
sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0])
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
6 changes: 2 additions & 4 deletions tests/.datafog_env/bin/pip3.11
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
# -*- coding: utf-8 -*-
import re
import sys

from pip._internal.cli.main import main

if __name__ == "__main__":
sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0])
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
Loading

0 comments on commit 2facea8

Please sign in to comment.