-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpush_to_elasticsearch.py
126 lines (112 loc) · 4.81 KB
/
push_to_elasticsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import time
import sys
import json
import requests
import calendar
import datetime
import base64
with open('ThingsBoard-config.json', 'r') as f:
config = json.load(f)
# Load config from JSON file
THINGSBOARD_HOST = config['THINGSBOARD_HOST']
THINGSBOARD_USERNAME = config['THINGSBOARD_USERNAME']
THINGSBOARD_PASSWORD = config['THINGSBOARD_PASSWORD']
THINGSBOARD_DEVICEID = config['THINGSBOARD_DEVICEID']
THINGSBOARD_KEYS = config['THINGSBOARD_KEYS']
# Get the JWT token
httpPostUrl = 'http://'+THINGSBOARD_HOST+'/api/auth/login?Content-Type=application/json&Accept=application/json'
httpPostBody = {"username": THINGSBOARD_USERNAME, "password": THINGSBOARD_PASSWORD}
httpPostHeathers = {"Content-Type": "application/json"}
r = requests.post(httpPostUrl, data=json.dumps(httpPostBody), headers=httpPostHeathers)
print("Status Code: " + str(r.status_code))
jwt_data = r.json()
print("JWT Token:")
print("##########")
print(jwt_data)
print("##########")
jwt_data = json.loads(r.text)
jwt_token = jwt_data['token']
# Get the latest telemetry
httpGetUrl = 'http://'+THINGSBOARD_HOST+'/api/plugins/telemetry/DEVICE/'+THINGSBOARD_DEVICEID+'/values/timeseries?keys='+THINGSBOARD_KEYS
httpGetHeaders = {"Content-Type": "application/json", "X-Authorization": "Bearer "+jwt_token}
r2 = requests.get(httpGetUrl, headers=httpGetHeaders)
print("Status Code: " + str(r2.status_code))
telemetry_data = r2.json()
print("Telemetry Data:")
print("##########")
print(telemetry_data)
print("##########")
# Get time
current_dt = datetime.datetime.utcnow()
current_dt_unix = calendar.timegm(current_dt.utctimetuple())
current_dt_unix_min = current_dt_unix - 120
print("Current Time: ")
print(current_dt_unix)
print("Current Time -120s")
print(current_dt_unix_min)
print("##########")
# Get Telemetry from Past Interval
httpGetUrl = 'http://'+THINGSBOARD_HOST+'/api/plugins/telemetry/DEVICE/'+THINGSBOARD_DEVICEID+'/values/timeseries?keys='+THINGSBOARD_KEYS+'&startTs='+str(current_dt_unix_min)+'000'+'&endTs='+str(current_dt_unix)+'000'+'&interval=60000&limit=100&agg=AVG'
httpGetHeaders = {"Content-Type": "application/json", "X-Authorization": "Bearer "+jwt_token}
r3 = requests.get(httpGetUrl, headers=httpGetHeaders)
print("Status Code: " + str(r3.status_code))
telemetry_data = r3.json()
print("Telemetry Data:")
print("##########")
print(telemetry_data)
print("##########")
telemetry_data = json.loads(r3.text)
# Get the number of telemetry data
number_telemetry = r3.json()
# Load ES Configuration and Get ES Token
# Load config from JSON file
with open('ES-config.json', 'r') as h:
config_es = json.load(h)
ES_HOST = config_es['ES_HOST']
ES_USER = config_es['ES_USER']
ES_PASSWORD = config_es['ES_PASSWORD']
ES_INDEX = config_es['ES_INDEX']
ES_TYPE = config_es['ES_TYPE']
with open('Sensor-config.json', 'r') as g:
config_sensor = json.load(g)
sensorID = config_sensor['sensorID']
sensorLocation = config_sensor['sensorLocation']
# GET ES Token
# es_httpTokenUrl = 'https://'+ES_HOST+'/_xpack/security/oauth2/token'
# es_httpTokenBody = {"grant_type" : "password", "username" : ES_USER, "password" : ES_PASSWORD}
# es_httpTokenHeaders = {"Content-Type": "application/json"}
# r4 = requests.post(es_httpTokenUrl, data=json.dumps(es_httpTokenBody), headers=es_httpTokenHeaders)
# print("Status Code: " + str(r4.status_code))
# esToken_data = r4.json()
# print("ES Token Data:")
# print("##########")
# print(esToken_data)
# print("##########")
# esToken_data = json.loads(r4.text)
# esToken = esToken_data['access_token']
# encode user/password base64
credentials_string = ES_USER+":"+ES_PASSWORD
credentials = base64.b64encode(credentials_string)
# Iterate and push each telemetry node to ES
for index, item in enumerate(number_telemetry['temperature']):
telemetry_temperature = telemetry_data['temperature'][index]['value']
telemetry_temperature_timestamp = telemetry_data['temperature'][index]['ts']
print("**********")
print("Temperature to push: " + telemetry_temperature)
telemetry_humidity = telemetry_data['humidity'][index]['value']
telemetry_humidity_timestamp = telemetry_data['humidity'][index]['ts']
print("Humidity to push: "+ telemetry_humidity)
print("**********")
# Push to Elasticsearch
print("**********")
print("Pushing to Elasticsearch")
print("**********")
# Index Content
es_httpIndexUrl = 'https://'+ES_HOST+'/'+ES_INDEX+'/'+ES_TYPE
es_httpIndexBody = {"sensorID": sensorID, "sensorLocation": sensorLocation, "temperature": telemetry_temperature, "humidity": telemetry_humidity, "temperatureTimestamp": telemetry_temperature_timestamp, "humidityTimestamp": telemetry_humidity_timestamp}
es_httpIndexHeaders = {"Content-Type": "application/json", "Authorization": "Basic "+credentials}
r5 = requests.post(es_httpIndexUrl, data=json.dumps(es_httpIndexBody), headers=es_httpIndexHeaders)
print("Status Code:" + str(r5.status_code))
print("Message: ")
print(r5.json())