In this project I leveraged Socrata OPCV API data to build a pipeline of logs from Docker Container to the Elasticsearch, Kibana Stack where data was collected, analyzed and transformed into visuals. Scripts were written in python and polished to be able to take in command line arguments from UNIX/LINUX operating systems. The scripts were tested for reproducibility by provisioning, configuring and executing an AWS EC2 instance which ran on a Docker container, read-in the python script, parsed in parameters from the command line and pull the API JSON logs. Additionally Git was utilized to maintain version control and prevent the confliction of concurrent work.
Table Of Content
{"plate": "27008JD", "state": "NY", "license_type": "COM", "summons_number": "1414761960", "issue_date": "10/17/2016", "violation_time": "02:53P", "violation": "DOUBLE PARKING", "fine_amount": "115", "penalty_amount": "0", "interest_amount": "0", "reduction_amount": "115", "payment_amount": "0", "amount_due": "0", "precinct": "041", "county": "BX", "issuing_agency": "POLICE DEPARTMENT", "violation_status": "HEARING HELD-NOT GUILTY", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VFZSUmVFNUVZekpOVkdzeVRVRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "86041MG", "state": "NY", "license_type": "COM", "summons_number": "1414761983", "issue_date": "10/17/2016", "violation_time": "03:04P", "violation": "DOUBLE PARKING", "fine_amount": "115", "penalty_amount": "0", "interest_amount": "0", "reduction_amount": "115", "payment_amount": "0", "amount_due": "0", "precinct": "041", "county": "BX", "issuing_agency": "POLICE DEPARTMENT", "violation_status": "HEARING HELD-NOT GUILTY", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VFZSUmVFNUVZekpOVkdzMFRYYzlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "68080JZ", "state": "NY", "license_type": "COM", "summons_number": "7054261800", "issue_date": "09/02/2016", "violation_time": "02:07P", "violation": "NO STANDING-DAY/TIME LIMITS", "fine_amount": "115", "penalty_amount": "0", "interest_amount": "0", "reduction_amount": "23", "payment_amount": "92", "amount_due": "0", "precinct": "013", "county": "NY", "issuing_agency": "TRAFFIC", "violation_status": "HEARING HELD-GUILTY REDUCTION", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQk1VNUVTVEpOVkdkM1RVRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "EKS2203", "state": "NY", "license_type": "PAS", "summons_number": "5032717149", "issue_date": "10/25/2008", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGxSQmVrMXFZM2hPZWtVd1QxRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "YDY83T", "state": "NJ", "license_type": "PAS", "summons_number": "5032717150", "issue_date": "10/25/2008", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGxSQmVrMXFZM2hPZWtVeFRVRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "WBD64W", "state": "NJ", "license_type": "PAS", "summons_number": "5032717290", "issue_date": "10/28/2008", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGxSQmVrMXFZM2hPZWtrMVRVRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "XBSD23", "state": "NJ", "license_type": "PAS", "summons_number": "7054262268", "issue_date": "09/06/2016", "violation_time": "02:03P", "violation": "EXPIRED MUNI MTR-COMM MTR ZN", "fine_amount": "65", "penalty_amount": "10", "interest_amount": "0", "reduction_amount": "0", "payment_amount": "75", "amount_due": "0", "precinct": "013", "county": "NY", "issuing_agency": "TRAFFIC", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQk1VNUVTVEpOYWtreVQwRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "X4818A", "state": "NJ", "license_type": "PAS", "summons_number": "7081479911", "issue_date": "08/08/2016", "violation_time": "11:10A", "violation": "NO PARKING-STREET CLEANING", "fine_amount": "65", "penalty_amount": "0", "interest_amount": "0", "reduction_amount": "65", "payment_amount": "0", "amount_due": "0", "precinct": "019", "county": "NY", "issuing_agency": "TRAFFIC", "violation_status": "HEARING HELD-NOT GUILTY", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQk5FMVVVVE5QVkd0NFRWRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "HAM3170", "state": "NY", "license_type": "PAS", "summons_number": "1406722273", "issue_date": "03/15/2016", "violation_time": "09:02P", "violation": "FIRE HYDRANT", "judgment_entry_date": "06/30/2016", "fine_amount": "115", "penalty_amount": "60", "interest_amount": "0.1", "reduction_amount": "0.1", "payment_amount": "175", "amount_due": "0", "precinct": "052", "county": "BX", "issuing_agency": "POLICE DEPARTMENT", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VFZSUmQwNXFZM2xOYWtrelRYYzlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "483UKO", "state": "CT", "license_type": "999", "summons_number": "5032773141", "issue_date": "11/01/2008", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGxSQmVrMXFZek5OZWtVd1RWRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "U177562", "state": "TN", "license_type": "PAS", "summons_number": "1405578191", "issue_date": "06/08/2016", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VFZSUmQwNVVWVE5QUkVVMVRWRTlQUT09&locationName=_____________________", "description": "View Summons"}}
{"plate": "GLL5464", "state": "PA", "license_type": "PAS", "summons_number": "7940452836", "issue_date": "05/24/2008", "summons_image": {"url": "http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wck1FMUVVVEZOYW1kNlRtYzlQUT09&locationName=_____________________", "description": "View Summons"}}
plate state license_type summons_number issue_date violation_time violation judgment_entry_date fine_amount penalty_amount interest_amount reduction_amount payment_amount amount_due precinct county issuing_agency summons_image violation_status
0 DN9F942 TX PAS 7000233503 01/11/2016 08:43A NO PARKING-DAY/TIME LIMITS 04/28/2016 60 60 41.64 0 0 161.64 108 Q TRAFFIC {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQmQwMUVTWHBOZWxWM1RYYzlQUT09&locationName=_____________________', 'description': 'View Summons'}
1 GBJ3345 NY PAS 7000233801 01/11/2016 02:47P NO STANDING-DAY/TIME LIMITS 04/28/2016 115 60 59.49 0 0 234.49 111 Q TRAFFIC {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQmQwMUVTWHBOZW1kM1RWRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
2 601030 NY PAS 4659389017 07/16/2019 08:18A PHTO SCHOOL ZN SPEED VIOLATION 50 0 0 0 50 0 000 QN DEPARTMENT OF TRANSPORTATION {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGtSWk1VOVVUVFJQVkVGNFRuYzlQUT09&locationName=_____________________', 'description': 'View Summons'}
3 6HVZ40 MA PAS 7000234040 01/12/2016 07:06A NO PARKING-STREET CLEANING 04/28/2016 45 60 35.73 0 0 140.73 109 Q TRAFFIC {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQmQwMUVTWHBPUkVFd1RVRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
4 C0YS82 FL PAS 4659388979 07/16/2019 08:16A PHTO SCHOOL ZN SPEED VIOLATION 50 0 0 0 50 0 000 QN DEPARTMENT OF TRANSPORTATION {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGtSWk1VOVVUVFJQUkdzelQxRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
5 VF16882 NY ORG 4659389054 07/16/2019 08:20A PHTO SCHOOL ZN SPEED VIOLATION 50 0 0 0 50 0 000 BK DEPARTMENT OF TRANSPORTATION {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGtSWk1VOVVUVFJQVkVFeFRrRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
6 PAP3813 MN PAS 7000234090 01/12/2016 08:24A NO STANDING-DAY/TIME LIMITS 04/28/2016 115 60 59.49 0 0 234.49 109 Q TRAFFIC {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQmQwMUVTWHBPUkVFMVRVRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
7 T761267C NY OMT 4659388918 07/16/2019 08:15A PHTO SCHOOL ZN SPEED VIOLATION 50 0 0 0 50 0 000 BK DEPARTMENT OF TRANSPORTATION {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VGtSWk1VOVVUVFJQUkd0NFQwRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
8 HBE8891 NY PAS 7000234325 01/15/2016 10:32A INSP. STICKER-EXPIRED/MISSING 05/05/2016 65 60 43.4 0 0 168.4 114 Q TRAFFIC {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VG5wQmQwMUVTWHBPUkUxNVRsRTlQUT09&locationName=_____________________', 'description': 'View Summons'}
9 1SPIDER3 NY SRF 8728804053 03/02/2019 01:05P INSP. STICKER-EXPIRED/MISSING 08/15/2019 65 60 0.37 0.09 125.28 0 025 NY TRAFFIC {'url': 'http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VDBSamVVOUVaM2RPUkVFeFRYYzlQUT09&locationName=_____________________', 'description': 'View Summons'} HEARING HELD-GUILTY
SOURCE CODE main.py
from src.APIcall import get_data, get_stats
if __name__ == '__main__':
options = get_stats()
print('[+] Requested ', options.page_size, ' Records From The API Per Call')
print('[+] Requested To Run Query ', options.num_pages, 'x')
print('[+] Requested To Load Data As A ', options.output, ' File Type')
get_data(options)
SOURCE CODE APICall.py
import os
import json
import argparse
from sodapy import Socrata
domain = 'data.cityofnewyork.us'
client_id = 'nc67-uf89'
data = Socrata(domain, os.getenv("APP_KEY"))
data_count = data.get(client_id, select ='COUNT(*)')
count = int(data_count[0]['COUNT'])
def get_stats():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--page_size',
help= "To input records to request API",
type=int)
parser.add_argument('-n', '--num_pages',
help= "The number of times to query for data",
type=int)
parser.add_argument('-o', '--output', default = None,
help = "To print as .json or stdout")
return parser.parse_args()
def get_data(options):
if not options.num_pages:
options.num_pages = count // options.page_size + 1
if options.output and '.json' in options.output:
with open(options.output, 'w') as results:
pass
for runs in range(options.num_pages):
offset = options.page_size*runs
outfile = data.get(client_id,
limit=options.page_size,
offset=offset)
for result in outfile:
if not options.output or '.json' not in options.output:
print(result)
else:
with open(options.output, "a") as results:
results.write(json.dumps(result) + '\n')
Dockerfile Requirements
1st.) Fudamental Instructions.
2nd.) Configuration Instructions.
3rd.) Execution Instructions.
Dockerfile Used to build Docker Image
ARG PYTHON_VERSION=3.7
FROM python:${PYTHON_VERSION}
WORKDIR /.app
COPY . .
RUN pip install -r requirements.txt
Docker Commands Used to Build Image
and Run Python API Call
docker build -t {docker_image.name:image_version} .
docker run -v $(pwd):/app -it {docker_account}/{docker_image.name:image_version} /bin/bash
docker tag {docker_image_tag} {docker_image.name:image_desired_tagname}
docker push {docker_image.name:image_version}
docker run -v ${PWD}:/app/foo -e APP_KEY=$APP_KEY {docker_image.name:image_version} python -m main --num_pages {parameter} --page_size {parameter} --output= {parameter}
Docker-Compose.yml
version: '3'
services:
pyth:
network_mode: host
container_name: pyth
build:
context: .
volumes:
- .:/app:rw
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.3.2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:6.3.2
ports:
- "5601:5601"
Running Docker-Compose
docker-compose up -d
docker-compose build pyth
docker-compose run pyth bin/bash
# Additional Compands {optional use}
docker exec -it [container] bash
docker run -it --link [container:version] --rm [] sh -c 'exec [] -h $ROOT_PRIVLEDGES bash
docker-compose config
docker-compase config --services
docker-compose images
docker-compose logs
docker-compose logs --tail=10
docker-compose top
docker-compose down
Will not delve into Docker Swarm in this project but below are some noteworthy DockerSwarm commands
docker-machine create --driver virtualbox {Swarm_Node}
docker-machine stop {Swarm_Manager}
docker machine start {Swarm_Manager}
docker-machine ssh {Swarm_Node}
docker swarm init -advertise-addr {Ip_Address}
docker swarm join --{docker_swarm_join_node_token}
docker node ls
docker node inspect --pretty {Swarm_Node}
docker service create --name {Network} -p {Ip_Address} -replace {docker_image}:{docker_image_tag}
docker service ls
docker service ps {docker_image}
docker node update -- availability drain {Swarm-Node}
docker swarm leave
docker node rm {Swarm_Node}
SSH
into EC2
ssh -i file.pem ubuntu@{IP_address}
sudo apt install docker.io
sudo docker login --username={USERNAME}
sudo docker pull {docker_account}/{docker_image.name:image_version}
sudo docker run -it {docker_account}/{docker_image.name:image_version} /bin/bash
sudo docker run -it {docker_account}/{docker_image.name:image_version} python main.py --page_size {parameter} --num_pages {parameter} --output {parameter}
- Our initial main.py file is EC2 certified
Downloading ElasticSearch
on Ubuntu
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.1-amd64.deb
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.1-amd64.deb.sha512
shasum -a 512 -c elasticsearch-7.6.1-amd64.deb.sha512
sudo dpkg -i elasticsearch-7.6.1-amd64.deb
docker-compose logs elasticsearch
sudo sysctl -w vm.max_map_count=262144
from datetime import datetime
from elasticsearch import Elasticsearch
import os, sys
import json
import argparse
from sodapy import Socrata
domain = 'data.cityofnewyork.us'
client_id = 'nc67-uf89'
plug = Socrata(domain, os.getenv("APP_KEY"))
data_count = plug.get(client_id, select ='COUNT(*)')
count = int(data_count[0]['COUNT'])
def get_stats():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--page_size', help= "To input records to request API", type=int)
parser.add_argument('-n', '--num_pages', help= "The number of times to query for data",type=int)
parser.add_argument('-o', '--output', default = None, help = "To print as .json or stdout")
return parser.parse_args()
options = get_stats()
def its_elastic():
es = Elasticsearch()
try:
es.indices.create('bigdata1')
except:
pass
return es
its_elastic()
def get_data(options, es):
i = 0
if not options.num_pages:
options.num_pages = count // options.page_size + 1
if options.output and '.json' in options.output:
with open(options.output, 'w') as results:
pass
for runs in range(options.num_pages):
offset = options.page_size*runs
outfile = plug.get(client_id,
limit=options.page_size,
offset=offset) #list
for r in outfile:
date = None
time = None
timestamp = datetime(1, 1, 1, 1, 1)
for key, value in r.items():
if "date" in key:
date = datetime.strptime(value, '%m/%d/%Y').date()
if "time" in key:
time = datetime.strptime(value+'M', '%I:%M%p').time()
if 'amount' in key:
r[key] = float(value)
if date and time:
timestamp = datetime.combine(date, time)
if date and not time:
timestamp = date
if not date and time:
timestamp = time
r['timestamp'] = timestamp
i += 1
res = es.index("bigdata", id=i, body = r)
get_data(options, es)
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.1-amd64.deb
shasum -a 512 kibana-7.6.1-amd64.deb
sudo dpkg -i kibana-7.6.1-amd64.deb
ps -p 1
sudo /bin/systemctl daemon-reload
sudo /bin/systemctl enable kibana.service
sudo systemctl start kibana.service
sudo systemctl stop kibana.service
docker-compose run -e APP_KEY=$APP_KEY pyth python -m _main --page_size 500 --num_pages 500 --output results.json