From 7c83f6058a47e1494d41e4a2686ed75da46bb0a0 Mon Sep 17 00:00:00 2001 From: Robert Lutes Date: Wed, 23 Sep 2015 15:06:10 -0700 Subject: [PATCH 1/8] Initial commit of updated weather agent (3.x). --- .../core/WeatherAgent/weather/weatheragent.py | 192 +++++++++++------- .../WeatherAgent/weatheragent.launch.json | 7 +- 2 files changed, 124 insertions(+), 75 deletions(-) diff --git a/services/core/WeatherAgent/weather/weatheragent.py b/services/core/WeatherAgent/weather/weatheragent.py index 0e6b46ef8d..bb5246b70b 100644 --- a/services/core/WeatherAgent/weather/weatheragent.py +++ b/services/core/WeatherAgent/weather/weatheragent.py @@ -7,8 +7,8 @@ # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. @@ -24,9 +24,9 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # -# The views and conclusions contained in the software and documentation are those -# of the authors and should not be interpreted as representing official policies, -# either expressed or implied, of the FreeBSD Project. +# The views and conclusions contained in the software and documentation are +# those of the authors and should not be interpreted as representing official, +# policies either expressed or implied, of the FreeBSD Project. # # This material was prepared as an account of work sponsored by an @@ -51,38 +51,41 @@ # operated by BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY # under Contract DE-AC05-76RL01830 -#}}} - - +# }}} import sys import requests -import json import datetime import logging +from dateutil.parser import parse -from volttron.platform.agent import BaseAgent, PublishMixin, periodic from volttron.platform.agent.utils import jsonapi from volttron.platform.agent import utils -from volttron.platform.agent.matching import match_start -from volttron.platform.messaging import headers as headers_mod +from volttron.platform.agent import matching +from volttron.platform.messaging import headers as headers_mod, topics +from volttron.platform.vip.agent import Agent, Core +from volttron.platform.async import AsyncCall +from volttron.platform.agent import utils +from volttron.platform.vip.agent import * import settings utils.setup_logging() _log = logging.getLogger(__name__) +HEADER_NAME_DATE = headers_mod.DATE +HEADER_NAME_CONTENT_TYPE = headers_mod.CONTENT_TYPE +REQUESTS_EXHAUSTED = 'requests_exhausted' ''' -******* In order for this agent to retrieve data from Weather Underground, you must -get a developer's key and put that into the config file. +******* In order for this agent to retrieve data from Weather Underground, + you must get a developer's key and put that into the seetings.py file. http://www.wunderground.com/weather/api/ ******** ''' -topic_delim = '/' - +TOPIC_DELIM = '/' temperature = ["temperature_string", "temp_f", "temp_c", "feelslike_c", "feelslike_f", "feelslike_string", "windchill_c", "windchill_f", "windchill_string", "heat_index_c", "heat_index_f", "heat_index_string"] wind = ["wind_gust_kph", "wind_string", "wind_mph", "wind_dir", "wind_degrees", "wind_kph", "wind_gust_mph", "pressure_in"] @@ -92,46 +95,72 @@ precipitation = ["dewpoint_string", "precip_today_string", "dewpoint_f", "dewpoint_c", "precip_today_metric", "precip_today_in", "precip_1hr_in", "precip_1hr_metric", "precip_1hr_string"] pressure_humidity = ["pressure_trend", "pressure_mb", "relative_humidity"] -categories = {"temperature": temperature, "wind": wind, "location": location, "time": time_topics, "cloud_cover": cloud_cover, 'precipitation': precipitation, 'pressure_humidity': pressure_humidity} -REQUESTS_EXHAUSTED = 'requests_exhausted' +categories = {'temperature': temperature, 'wind': wind, + 'location': location, 'precipitation': precipitation, + 'pressure_humidity': pressure_humidity, + 'time': time_topics, 'cloud_cover': cloud_cover} class RequestCounter: def __init__(self, daily_threshold, minute_threshold, poll_time): - self.daily = 0 + dtime = datetime.datetime.now() + f = open('.count', 'w+') + line = f.readline() + if line: + saved_state = line.split(',') + text_date = saved_state[0] + saved_date = parse(text_date, fuzzy=True).date() + if saved_date == dtime.date() and saved_state[2] == settings.KEY: + self.daily = int(saved_state[1]) + else: + self.daily = 0 self.date = datetime.datetime.today().date() self.per_minute_requests = [] self.minute_reserve = 1 - self.daily_reserve = self.minute_reserve * (3600 / poll_time) * 24 - self.daily_threshold = daily_threshold - self.daily_reserve + self.daily_threshold = daily_threshold + if poll_time < 180: + _log.warning('May exceed number of calls to WeatherUnderground' + 'for free api plan limit is 500 queries per day. ' + 'poll_time should be greater than 3 minutes.') self.minute_threshold = minute_threshold - self.minute_reserve def request_available(self): + ''' + Keep track of request on WU API key for saving state. + ''' now = datetime.datetime.today() if (now.date() - self.date).days < 1: if self.daily >= self.daily_threshold: - return False + False else: self.date = now.date() self.daily = 0 - - while len(self.per_minute_requests) > 0 and (now - self.per_minute_requests[-1]).seconds > 60: + f = open('.count', 'w+') + _count = ''.join([str(self.date), ',', + str(self.daily), ',', + settings.KEY]) + f.write(_count) + f.close() + while (len(self.per_minute_requests) > 0 and + (now - self.per_minute_requests[-1]).seconds > 60): self.per_minute_requests.pop() if len(self.per_minute_requests) < self.minute_threshold: self.per_minute_requests.insert(0, now) self.daily += 1 - + f = open('.count', 'w+') + _count = ''.join([str(self.date), ',', + str(self.daily), ',', + settings.KEY]) + f.write(_count) + f.close() else: return False return True - def store_state(self): - pass - -def WeatherAgent(config_path, **kwargs): +def weather_service(config_path, **kwargs): config = utils.load_config(config_path) def get_config(name): @@ -144,6 +173,7 @@ def get_config(name): poll_time = get_config('poll_time') zip_code = get_config("zip") key = get_config('key') + on_request_only = get_config('on_request_only') state = '' country = '' @@ -153,22 +183,25 @@ def get_config(name): max_requests_per_day = get_config('daily_threshold') max_requests_per_minute = get_config('minute_threshold') - class Agent(PublishMixin, BaseAgent): + class WeatherAgent(Agent): """Agent for querying WeatherUndergrounds API""" def __init__(self, **kwargs): - super(Agent, self).__init__(**kwargs) + super(WeatherAgent, self).__init__(**kwargs) self.valid_data = False - def setup(self): - super(Agent, self).setup() - + @Core.receiver('onstart') + def setup(self, sender, **kwargs): + '''On start method''' self._keep_alive = True - self.requestCounter = RequestCounter(max_requests_per_day, max_requests_per_minute, poll_time) + self.requestCounter = RequestCounter(max_requests_per_day, + max_requests_per_minute, + poll_time) # TODO: get this information from configuration file instead - - self.baseUrl = "http://api.wunderground.com/api/" + (key if not key == '' else settings.KEY) + "/conditions/q/" + base = "http://api.wunderground.com/api/" + \ + (key if not key == '' else settings.KEY) + self.baseUrl = base + "/conditions/q/" self.requestUrl = self.baseUrl if(zip_code != ""): @@ -178,9 +211,13 @@ def setup(self): else: # Error Need to handle this print "No location selected" - - #Do a one time push when we start up so we don't have to wait for the periodic - self.timer(10, self.weather_push) + self.vip.pubsub.subscribe(peer='pubsub', + prefix=topics.WEATHER_REQUEST, + callback=self.handle_request) + if not on_request_only: + self.weather = self.core.periodic(poll_time, + self.weather_push, + wait=0) def build_url_with_zipcode(self, zip_code): return self.baseUrl + zip_code + ".json" @@ -199,36 +236,49 @@ def build_dictionary(self, observation): return weather_dict def publish_all(self, observation, topic_prefix="weather", headers={}): - self.publish_subtopic(self.build_dictionary(observation), topic_prefix, headers) + self.publish_subtopic(self.build_dictionary(observation), + topic_prefix, headers) def publish_subtopic(self, publish_item, topic_prefix, headers): - #TODO: Update to use the new topic templates - if type(publish_item) is dict: + # TODO: Update to use the new topic templates + now = str(datetime.datetime.now()) + if isinstance(publish_item, dict): # Publish an "all" property, converting item to json - - headers[headers_mod.CONTENT_TYPE] = headers_mod.CONTENT_TYPE.JSON - self.publish_json(topic_prefix + topic_delim + "all", headers, json.dumps(publish_item)) + headers = {HEADER_NAME_DATE: now} + _topic = topic_prefix + TOPIC_DELIM + "all" + self.vip.pubsub.publish(peer='pubsub', + topic=_topic, + message=publish_item, + headers=headers) # Loop over contents, call publish_subtopic on each for topic in publish_item.keys(): - self.publish_subtopic(publish_item[topic], topic_prefix + topic_delim + topic, headers) - + self.publish_subtopic(publish_item[topic], + topic_prefix + TOPIC_DELIM + topic, + headers) else: # Item is a scalar type, publish it as is headers[headers_mod.CONTENT_TYPE] = headers_mod.CONTENT_TYPE.PLAIN_TEXT - self.publish(topic_prefix, headers, str(publish_item)) + headers.update({HEADER_NAME_DATE: now}) + self.vip.pubsub.publish(peer='pubsub', + topic=topic_prefix, + message=str(publish_item), + headers=headers) - @periodic(poll_time) def weather_push(self): + ''' + Function called on periodic or request for weather information. + ''' (valid_data, observation) = self.request_data(self.requestUrl) if valid_data: + now = datetime.datetime.now() headers = {headers_mod.FROM: agent_id} + headers.update({HEADER_NAME_DATE: now}) self.publish_all(observation, headers=headers) else: _log.error("Invalid data, not publishing") - @match_start("weather/request") - def handle_request(self, topic, headers, message, matched): + def handle_request(self, peer, sender, bus, topic, headers, message): msg = jsonapi.loads(message[0]) request_url = self.baseUrl @@ -239,7 +289,8 @@ def handle_request(self, topic, headers, message, matched): elif ('region' in msg) and ('city' in msg): self.requestUrl += msg['region'] + "/" + msg['city'] + ".json" else: - _log.error('Invalid request, no zipcode or region/city in request') + _log.error('Invalid request, no zipcode ' + 'or region/city in request') # TODO: notify requester of error # Request data @@ -251,15 +302,17 @@ def handle_request(self, topic, headers, message, matched): resp_headers[headers_mod.TO] = headers[headers_mod.REQUESTER_ID] resp_headers['agentID'] = agent_id resp_headers[headers_mod.FROM] = agent_id - self.publish_all(observation, 'weather' + topic_delim + 'response', resp_headers) + _topic = 'weather' + TOPIC_DELIM + 'response' + self.publish_all(observation, _topic, resp_headers) else: if observation == REQUESTS_EXHAUSTED: _log.error('No requests avaliable') # TODO: report error to client - # Else, log that the data was invalid and report an error to the requester + # Else, log that the data was invalid and report an error + # to the requester else: _log.error('Weather API response was invalid') - #TODO: send invalid data error back to requester + # TODO: send invalid data error back to requester def request_data(self, requestUrl): if self.requestCounter.request_available(): @@ -276,8 +329,6 @@ def request_data(self, requestUrl): _log.error(e) valid_data = False return (valid_data, None) - #self.print_data() - else: _log.warning("No requests available") return (False, REQUESTS_EXHAUSTED) @@ -288,27 +339,24 @@ def print_data(self): print "{0:>25}: {1}".format(key, self.observation[key]) print "{0:*^40}".format(" ") - Agent.__name__ = 'WeatherAgent' - return Agent(**kwargs) + WeatherAgent.__name__ = 'weather_service' + return WeatherAgent(**kwargs) -def convert(input): - if isinstance(input, dict): - return {convert(key): convert(value) for key, value in input.iteritems()} - elif isinstance(input, list): - return [convert(element) for element in input] - elif isinstance(input, unicode): - return input.encode('utf-8') +def convert(_input): + if isinstance(_input, dict): + return {convert(key): convert(value) for key, value in _input.iteritems()} + elif isinstance(_input, list): + return [convert(element) for element in _input] + elif isinstance(_input, unicode): + return _input.encode('utf-8') else: - return input + return _input def main(argv=sys.argv): '''Main method called by the eggsecutable.''' - utils.default_main(WeatherAgent, - description='Weather Underground agent', - argv=argv) - + utils.vip_main(weather_service) if __name__ == '__main__': # Entry point for script diff --git a/services/core/WeatherAgent/weatheragent.launch.json b/services/core/WeatherAgent/weatheragent.launch.json index d349ef3584..6a20c643e7 100644 --- a/services/core/WeatherAgent/weatheragent.launch.json +++ b/services/core/WeatherAgent/weatheragent.launch.json @@ -1,7 +1,8 @@ { "agentid": "Weather1", - "poll_time": 3600, + "poll_time": 900, "minute_threshold" : 5, - "daily_threshold" : 200, - "zip" : "37918" + "daily_threshold" : 500, + "zip" : "37918", + "on_request_only": false } From b3f0e90af9480a252ba87f84c7aab9ce85fc3a8f Mon Sep 17 00:00:00 2001 From: Robert Lutes Date: Thu, 24 Sep 2015 00:33:28 -0700 Subject: [PATCH 2/8] Add weather topic to topics.py. --- volttron/platform/messaging/topics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/volttron/platform/messaging/topics.py b/volttron/platform/messaging/topics.py index c125d654de..0243a5cbfa 100644 --- a/volttron/platform/messaging/topics.py +++ b/volttron/platform/messaging/topics.py @@ -179,4 +179,5 @@ DRIVER_CONFIG_REMOVE = _(_CONFIG_VALUE.replace('{category}', 'driver')) DRIVER_CONFIG_UPDATE = _(_CONFIG_VALUE.replace('{category}', 'driver')) - +WEATHER_BASE = 'weather' +WEATHER_REQUEST = 'weather/request' From be7b7b759bc8f2f25bba3bfebbd288f74b0fb55f Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Thu, 15 Oct 2015 12:56:08 -0700 Subject: [PATCH 3/8] Made lines smaller to be like pylint compliant. --- .../core/WeatherAgent/weather/weatheragent.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/services/core/WeatherAgent/weather/weatheragent.py b/services/core/WeatherAgent/weather/weatheragent.py index f3c418ba6e..7af34531ad 100644 --- a/services/core/WeatherAgent/weather/weatheragent.py +++ b/services/core/WeatherAgent/weather/weatheragent.py @@ -87,12 +87,22 @@ TOPIC_DELIM = '/' -temperature = ["temperature_string", "temp_f", "temp_c", "feelslike_c", "feelslike_f", "feelslike_string", "windchill_c", "windchill_f", "windchill_string", "heat_index_c", "heat_index_f", "heat_index_string"] -wind = ["wind_gust_kph", "wind_string", "wind_mph", "wind_dir", "wind_degrees", "wind_kph", "wind_gust_mph", "pressure_in"] -location = ["local_tz_long", "observation_location", "display_location", "station_id"] -time_topics = ["local_time_rfc822", "local_tz_short", "local_tz_offset", "local_epoch", "observation_time", "observation_time_rfc822", "observation_epoch"] -cloud_cover = ["weather", "solarradiation", "visibility_mi", "visibility_km", "UV"] -precipitation = ["dewpoint_string", "precip_today_string", "dewpoint_f", "dewpoint_c", "precip_today_metric", "precip_today_in", "precip_1hr_in", "precip_1hr_metric", "precip_1hr_string"] +temperature = ["temperature_string", "temp_f", "temp_c", "feelslike_c", + "feelslike_f", "feelslike_string", "windchill_c", + "windchill_f", "windchill_string", "heat_index_c", + "heat_index_f", "heat_index_string"] +wind = ["wind_gust_kph", "wind_string", "wind_mph", "wind_dir", + "wind_degrees", "wind_kph", "wind_gust_mph", "pressure_in"] +location = ["local_tz_long", "observation_location", "display_location", + "station_id"] +time_topics = ["local_time_rfc822", "local_tz_short", "local_tz_offset", + "local_epoch", "observation_time", "observation_time_rfc822", + "observation_epoch"] +cloud_cover = ["weather", "solarradiation", "visibility_mi", "visibility_km", + "UV"] +precipitation = ["dewpoint_string", "precip_today_string", "dewpoint_f", + "dewpoint_c", "precip_today_metric", "precip_today_in", + "precip_1hr_in", "precip_1hr_metric", "precip_1hr_string"] pressure_humidity = ["pressure_trend", "pressure_mb", "relative_humidity"] categories = {'temperature': temperature, 'wind': wind, @@ -269,6 +279,7 @@ def weather_push(self): ''' Function called on periodic or request for weather information. ''' + _log.debug("Requesting url: "+self.requestUrl) (valid_data, observation) = self.request_data(self.requestUrl) if valid_data: now = datetime.datetime.now() From 86ece678ed3091d220604a19da9b8d3fc927def3 Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Thu, 15 Oct 2015 14:03:21 -0700 Subject: [PATCH 4/8] Added the new key to the configuration file --- services/core/WeatherAgent/config | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/core/WeatherAgent/config b/services/core/WeatherAgent/config index 29dd52a02c..918cf01e42 100644 --- a/services/core/WeatherAgent/config +++ b/services/core/WeatherAgent/config @@ -3,5 +3,6 @@ "poll_time": 3600, "minute_threshold" : 5, "daily_threshold" : 500, - "zip" : "37918" + "zip" : "37918", + "on_request_only": false } From 6192af8193a01ac929b6c072e38f2310d55b5044 Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Thu, 15 Oct 2015 14:05:36 -0700 Subject: [PATCH 5/8] Renamed weather config file --- services/core/WeatherAgent/{config => weatheragent.config} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename services/core/WeatherAgent/{config => weatheragent.config} (100%) diff --git a/services/core/WeatherAgent/config b/services/core/WeatherAgent/weatheragent.config similarity index 100% rename from services/core/WeatherAgent/config rename to services/core/WeatherAgent/weatheragent.config From e4b3ef18467d9b0619d9eb0a04dee0b6a288a36f Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Thu, 15 Oct 2015 14:06:44 -0700 Subject: [PATCH 6/8] Removed older config files from the weather agent directory. --- services/core/WeatherAgent/weather-deploy.service | 10 ---------- services/core/WeatherAgent/weatheragent.launch.json | 8 -------- 2 files changed, 18 deletions(-) delete mode 100644 services/core/WeatherAgent/weather-deploy.service delete mode 100644 services/core/WeatherAgent/weatheragent.launch.json diff --git a/services/core/WeatherAgent/weather-deploy.service b/services/core/WeatherAgent/weather-deploy.service deleted file mode 100644 index e3a12f64e9..0000000000 --- a/services/core/WeatherAgent/weather-deploy.service +++ /dev/null @@ -1,10 +0,0 @@ -{ - "agent": { - "exec": "weatheragent-.1-py2.7.egg --config \"%c\" --sub \"%s\" --pub \"%p\"" - }, - "agentid": "Weather1", - "poll_time": 600, - "minute_threshold" : 5, - "daily_threshold" : 200, - "zip" : "99352" -} diff --git a/services/core/WeatherAgent/weatheragent.launch.json b/services/core/WeatherAgent/weatheragent.launch.json deleted file mode 100644 index 6a20c643e7..0000000000 --- a/services/core/WeatherAgent/weatheragent.launch.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "agentid": "Weather1", - "poll_time": 900, - "minute_threshold" : 5, - "daily_threshold" : 500, - "zip" : "37918", - "on_request_only": false -} From 5e5802321bb80254d7b5e30f08848542f5e3e73c Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Fri, 16 Oct 2015 10:41:27 -0700 Subject: [PATCH 7/8] Fixed so that both 2 and 3 agents can call the code through the compat layer --- services/core/WeatherAgent/weather/weatheragent.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/services/core/WeatherAgent/weather/weatheragent.py b/services/core/WeatherAgent/weather/weatheragent.py index 7af34531ad..b88ceb640a 100644 --- a/services/core/WeatherAgent/weather/weatheragent.py +++ b/services/core/WeatherAgent/weather/weatheragent.py @@ -290,7 +290,12 @@ def weather_push(self): _log.error("Invalid data, not publishing") def handle_request(self, peer, sender, bus, topic, headers, message): - msg = jsonapi.loads(message[0]) + + if sender == 'pubsub.compat': + msg = jsonapi.loads(message[0]) + else: + msg = message + request_url = self.baseUrl # Identify if a zipcode or region/city was sent From 420676221535152002df78def775604d9cf0f7f9 Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Fri, 16 Oct 2015 11:31:48 -0700 Subject: [PATCH 8/8] Added a test for the weatheragent --- services/core/WeatherAgent/weathertest.py | 30 +++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 services/core/WeatherAgent/weathertest.py diff --git a/services/core/WeatherAgent/weathertest.py b/services/core/WeatherAgent/weathertest.py new file mode 100644 index 0000000000..c04095609a --- /dev/null +++ b/services/core/WeatherAgent/weathertest.py @@ -0,0 +1,30 @@ +# Example file using the weather agent. +# +# Requirements +# - A VOLTTRON instance must be started +# - A weatheragnet must be running prior to running this code. +# +# Author: Craig Allwardt + +from volttron.platform.vip.agent import Agent +import gevent +from gevent.core import callback + + +def onmessage(peer, sender, bus, topic, headers, message): + print('received: peer=%r, sender=%r, bus=%r, topic=%r, headers=%r, message=%r' % ( + peer, sender, bus, topic, headers, message)) + +a = Agent() +gevent.spawn(a.core.run).join(0) +a.vip.pubsub.subscribe(peer='pubsub', + prefix='weather/response', + callback=onmessage).get(timeout=5) + +a.vip.pubsub.publish(peer='pubsub', + topic='weather/request', + headers={'requesterID': 'agent1'}, + message={'zipcode': '99336'}).get(timeout=5) + +gevent.sleep(5) +a.core.stop()