Skip to content

Commit

Permalink
pass, anonymizer tests for DataFog class
Browse files Browse the repository at this point in the history
  • Loading branch information
Sid Mohan authored and Sid Mohan committed Aug 19, 2024
1 parent 929aa3c commit 2bf5379
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 69 deletions.
59 changes: 24 additions & 35 deletions datafog/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,32 @@ class DataFog:

def __init__(
self,
image_service=ImageService(),
text_service=TextService(),
image_service=None,
text_service=None,
spark_service=None,
operations: List[OperationType] = [OperationType.SCAN],
hash_type: HashType = HashType.SHA256,
anonymizer_type: AnonymizerType = AnonymizerType.REPLACE,
):
self.image_service = image_service
self.text_service = text_service
self.image_service = image_service or ImageService()
self.text_service = text_service or TextService()
self.spark_service: SparkService = spark_service
self.operations: List[OperationType] = operations
self.anonymizer = Anonymizer()
self.anonymizer = Anonymizer(
hash_type=hash_type, anonymizer_type=anonymizer_type
)
self.logger = logging.getLogger(__name__)
self.logger.info(
"Initializing DataFog class with the following services and operations:"
)
self.logger.info(f"Image Service: {type(image_service)}")
self.logger.info(f"Text Service: {type(text_service)}")
self.logger.info(f"Image Service: {type(self.image_service)}")
self.logger.info(f"Text Service: {type(self.text_service)}")
self.logger.info(
f"Spark Service: {type(spark_service) if spark_service else 'None'}"
f"Spark Service: {type(self.spark_service) if self.spark_service else 'None'}"
)
self.logger.info(f"Operations: {operations}")
self.logger.info(f"Hash Type: {hash_type}")
self.logger.info(f"Anonymizer Type: {anonymizer_type}")

async def run_ocr_pipeline(self, image_urls: List[str]):
"""
Expand Down Expand Up @@ -151,19 +157,15 @@ async def _process_text(self, text_list: List[str]):
)
return text_list

def run_text_pipeline_sync(self, str_list: List[str]):
def run_text_pipeline_sync(self, str_list: List[str]) -> List[str]:
"""
Run the text pipeline synchronously on a list of input text.
This method processes a list of text strings in a synchronous manner, potentially
annotating them for personally identifiable information (PII) and applying
anonymization if enabled.
Args:
str_list (List[str]): A list of text strings to be processed.
Returns:
List: Processed text results based on the enabled operations.
List[str]: Processed text results based on the enabled operations.
Raises:
Exception: Any error encountered during the text processing.
Expand All @@ -176,29 +178,16 @@ def run_text_pipeline_sync(self, str_list: List[str]):
f"Text annotation completed with {len(annotated_text)} annotations."
)

if OperationType.REDACT in self.operations:
return [
self.anonymizer.anonymize(
text, annotations, AnonymizerType.REDACT
).anonymized_text
for text, annotations in zip(
str_list, annotated_text, strict=True
)
]
elif OperationType.REPLACE in self.operations:
return [
self.anonymizer.anonymize(
text, annotations, AnonymizerType.REPLACE
).anonymized_text
for text, annotations in zip(
str_list, annotated_text, strict=True
)
if any(
op in self.operations
for op in [
OperationType.REDACT,
OperationType.REPLACE,
OperationType.HASH,
]
elif OperationType.HASH in self.operations:
):
return [
self.anonymizer.anonymize(
text, annotations, AnonymizerType.HASH
).anonymized_text
self.anonymizer.anonymize(text, annotations).anonymized_text
for text, annotations in zip(
str_list, annotated_text, strict=True
)
Expand Down
2 changes: 1 addition & 1 deletion datafog/models/annotator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class AnnotationResult(BaseModel):

start: int
end: int
score: float
score: Optional[float]
entity_type: str
recognition_metadata: Optional[AnnotatorMetadata]

Expand Down
42 changes: 13 additions & 29 deletions datafog/models/anonymizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,21 @@ def replace_pii(
) -> AnonymizationResult:
"""Replace PII in text with anonymized values."""
replacements = []
print(f"Entities to anonymize: {self.entities}")
for annotation in sorted(annotations, key=lambda x: x.start, reverse=True):
print(f"Processing annotation: {annotation}")
if not self.entities or annotation.entity_type in self.entities:
print(f"Matched entity type: {annotation.entity_type}")
if self.anonymizer_type == AnonymizerType.REPLACE:
replacement = f"[{annotation.entity_type}_{len(replacements)}]"
replacements.append(
{
"original": text[annotation.start : annotation.end],
"replacement": replacement,
"entity_type": annotation.entity_type,
}
)
print(f"Added replacement: {replacements[-1]}")

print(f"Final replacements: {replacements}")
anonymized_text = text
for replacement in reversed(replacements):
start = text.index(replacement["original"])
end = start + len(replacement["original"])
anonymized_text = (
anonymized_text[:start]
+ replacement["replacement"]
+ anonymized_text[end:]
)
replacement = self._generate_replacement(
text[annotation.start : annotation.end], annotation.entity_type
)
replacements.append(
{
"original": text[annotation.start : annotation.end],
"replacement": replacement,
"entity_type": annotation.entity_type,
}
)
text = text[: annotation.start] + replacement + text[annotation.end :]

return AnonymizationResult(
anonymized_text=anonymized_text, replaced_entities=replacements
)
return AnonymizationResult(anonymized_text=text, replaced_entities=replacements)

def _generate_replacement(self, original: str, entity_type: EntityTypes) -> str:
"""Generate a replacement for the given entity."""
Expand All @@ -119,7 +104,7 @@ def hash_pii(

start, end = annotation.start, annotation.end
original = text[start:end]
replacement = self._hash_text(original)[: len(original)]
replacement = self._hash_text(original)

text = text[:start] + replacement + text[end:]
replacements.append(
Expand All @@ -145,7 +130,6 @@ def _hash_text(self, text: str) -> str:
def redact_pii(
self, text: str, annotations: List[AnnotationResult]
) -> AnonymizationResult:
"""Redact PII in text."""
replacements = []
for annotation in sorted(annotations, key=lambda x: x.start, reverse=True):
if self.entities and annotation.entity_type not in self.entities:
Expand Down
7 changes: 3 additions & 4 deletions tests/test_anonymizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,12 @@ def test_anonymizer_hash(sample_text, sample_annotations, hash_type):
for replacement in result.replaced_entities:
assert replacement["original"] in sample_text
assert replacement["replacement"] not in sample_text
assert len(replacement["replacement"]) == len(replacement["original"])

# assert len(replacement["replacement"]) == len(replacement["original"])
# Check hash type-specific properties
if hash_type == HashType.MD5:
assert len(replacement["replacement"]) <= 32
assert len(replacement["replacement"]) == 32
elif hash_type in [HashType.SHA256, HashType.SHA3_256]:
assert len(replacement["replacement"]) <= 64
assert len(replacement["replacement"]) == 64


def test_anonymizer_with_specific_entities(sample_text, sample_annotations):
Expand Down
75 changes: 75 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import json
import logging
import re
from unittest.mock import AsyncMock, patch

import pytest

from datafog.config import OperationType
from datafog.main import DataFog
from datafog.models.annotator import AnnotationResult
from datafog.models.anonymizer import AnonymizerType, HashType
from datafog.processing.text_processing.spacy_pii_annotator import (
SpacyPIIAnnotator as TextPIIAnnotator,
)
Expand Down Expand Up @@ -164,3 +168,74 @@ def test_run_text_pipeline_sync_no_annotation():
result = datafog.run_text_pipeline_sync(["Sample text"])

assert result == ["Sample text"]


@pytest.mark.parametrize(
"operation, hash_type, expected_pattern",
[
(
OperationType.REDACT,
None,
r"\[REDACTED\] tries one more time to save his \$56 billion pay package",
),
(
OperationType.REPLACE,
None,
r"\[PERSON(_[A-F0-9]+)?\] tries one more time to save his \$56 billion pay package",
),
(
OperationType.HASH,
HashType.MD5,
r"([a-f0-9]{32}) tries one more time to save his \$56 billion pay package",
),
(
OperationType.HASH,
HashType.SHA256,
r"([a-f0-9]{64}) tries one more time to save his \$56 billion pay package",
),
(
OperationType.HASH,
HashType.SHA3_256,
r"([a-f0-9]{64}) tries one more time to save his \$56 billion pay package",
),
],
)
def test_run_text_pipeline_anonymization(
mock_text_service, operation, hash_type, expected_pattern
):
logging.basicConfig(level=logging.INFO)
datafog = DataFog(
text_service=mock_text_service,
operations=[OperationType.SCAN, operation],
hash_type=hash_type,
anonymizer_type=operation,
)
mock_text_service.batch_annotate_text_sync.return_value = [
[
AnnotationResult(
start=0,
end=9,
entity_type="PERSON",
text="Elon Musk",
score=0.9,
recognition_metadata={"confidence": "high"},
)
]
]

result = datafog.run_text_pipeline_sync(
["Elon Musk tries one more time to save his $56 billion pay package"]
)

logging.info(f"Result: {result}")
assert len(result) == 1, "Expected a single result"
assert re.match(
expected_pattern, result[0]
), f"Result {result[0]!r} does not match pattern {expected_pattern!r}"

if operation == AnonymizerType.HASH:
hashed_part = result[0].split()[0]
if hash_type == HashType.MD5:
assert len(hashed_part) == 32
elif hash_type in [HashType.SHA256, HashType.SHA3_256]:
assert len(hashed_part) == 64

0 comments on commit 2bf5379

Please sign in to comment.