Skip to content

Commit

Permalink
Merge branch 'before-checkpoint1' into releases/before-checkpoint1
Browse files Browse the repository at this point in the history
  • Loading branch information
craig8 committed Nov 13, 2023
2 parents 4362bc4 + 6fe2b3b commit 58b6184
Show file tree
Hide file tree
Showing 14 changed files with 11,379 additions and 30 deletions.
69 changes: 39 additions & 30 deletions scripts/pycharm-launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
input box put services/core/VolttronCentral/volttroncentral/agent.py.
"""
import argparse
import shutil
import string
import sys
import os
import runpy
import shutil
import string
import subprocess
import sys

from volttron.platform import jsonapi

__author__ = 'Craig Allwardt<[email protected]>'
Expand All @@ -31,16 +32,23 @@
parser = argparse.ArgumentParser()

parser.add_argument("agent", help="Path to the agent file to be executed.")
parser.add_argument("-s", "--silence", const=True, dest="silence", nargs="?",
parser.add_argument("-s",
"--silence",
const=True,
dest="silence",
nargs="?",
help="Silence the help message.")
parser.add_argument("-n", "--no-config", action="store_true",
help="Don't include the default config in the agent directory.")
parser.add_argument(
"-n",
"--no-config",
action="store_true",
help="Don't include the default config in the agent directory.")
parsed = parser.parse_args()

mod_name = [os.path.basename(parsed.agent)]
if not os.path.isfile(parsed.agent):
sys.stdout.write("Passed argument must be a python file! {}".
format(parsed.agent))
sys.stdout.write("Passed argument must be a python file! {}".format(
parsed.agent))
sys.exit()

abspath = os.path.abspath(os.path.join(parsed.agent, os.pardir))
Expand All @@ -60,14 +68,12 @@


def write_required_statement(out=sys.stderr):
out.write(
"""Required Environment Variables
out.write("""Required Environment Variables
AGENT_VIP_IDENTITY - Required
Optional Environmental Variables
AGENT_CONFIG - Set to <agent directory>/config by default
VOLTTRON_HOME - Set to ~/.volttron by default
"""
)
""")


sys.path.insert(0, abspath)
Expand All @@ -80,18 +86,17 @@ def write_required_statement(out=sys.stderr):
path_found = os.path.join(abspath, cfg)
break
if not path_found:
sys.stderr.write('AGENT_CONFIG variable not set. Either set it or '
'put a config file in the root of the agent dir.')
sys.stderr.write(
'AGENT_CONFIG variable not set. Either set it or '
'put a config file in the root of the agent dir.')
sys.exit()
os.environ['AGENT_CONFIG'] = path_found

volttron_home = os.environ.get('VOLTTRON_HOME')

if not volttron_home:
os.environ['VOLTTRON_HOME'] = os.path.abspath(
os.path.expandvars(
os.path.join(
os.path.expanduser("~"), '.volttron')))
os.path.expandvars(os.path.join(os.path.expanduser("~"), '.volttron')))
volttron_home = os.environ.get('VOLTTRON_HOME')

# Now register the
Expand All @@ -116,11 +121,12 @@ def write_required_statement(out=sys.stderr):
os.makedirs(new_dir)
try:
output = subprocess.check_output(['vctl', 'auth', 'keypair'],
env=os.environ.copy(), universal_newlines=True, stderr=subprocess.STDOUT)
env=os.environ.copy(),
universal_newlines=True,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
sys.stderr.write("Couldn't get key pair for identity: {}\n".format(
agent_identity
))
agent_identity))
sys.stderr.write("Call was:\n\tvctl auth keypair\n")
sys.stderr.write("Output of command: {}".format(e.output))
sys.stderr.write("Your environment might not be setup correctly!")
Expand All @@ -135,19 +141,22 @@ def write_required_statement(out=sys.stderr):

pubkey = json_obj['public']
try:
params = ['vctl', 'auth', 'add',
'--credentials', "{}".format(pubkey), '--user_id', agent_identity,
'--capabilities', "edit_config_store",
'--comments', "Added from pycharm-launch.py script."
]
output = subprocess.check_output(params, env=os.environ.copy(), universal_newlines=True)
params = [
'vctl', 'auth', 'add', '--credentials', "{}".format(pubkey),
'--user_id', agent_identity, '--capabilities',
"edit_config_store", '--comments',
"Added from pycharm-launch.py script."
]
output = subprocess.check_output(params,
env=os.environ.copy(),
universal_newlines=True)
except subprocess.CalledProcessError as e:
sys.stderr.write(str(e))
sys.stderr.write("Command returned following output: {}".format(e.output))
sys.stderr.write("Command returned following output: {}".format(
e.output))
shutil.rmtree(new_dir)
sys.stderr.write("Couldn't authenticate agent id: {}\n".format(
agent_identity
))
sys.stderr.write(
"Couldn't authenticate agent id: {}\n".format(agent_identity))
sys.stderr.write("Call was: {}\n".format(params))
sys.stderr.write("Your environment might not be setup correctly!")
write_required_statement()
Expand Down
15 changes: 15 additions & 0 deletions services/core/IEEE_2030_5/example.config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# required parameters
cacertfile: ~/tls/certs/ca.pem
keyfile: ~/tls/private/dev1.pem
certfile: ~/tls/certs/dev1.pem
server_hostname: localhost
# the pin number is used to verify the server is the correct server
pin: 12345

# SSL defaults to 443
server_ssl_port: 8443
# http port defaults to none
server_http_port: 8080

subscriptions:
- devices/inverter1/all
28 changes: 28 additions & 0 deletions services/core/IEEE_2030_5/ieee_2030_5/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, List


@dataclass
class AllPoints:
points: Dict = field(default_factory=dict)
meta: Dict = field(default_factory=dict)

def add(self, name: str, value: Any, meta: Dict = {}):
self.points[name] = value
self.meta[name] = meta

def forbus(self) -> List:
return [self.points, self.meta]

@staticmethod
def frombus(message: List) -> AllPoints:
assert len(message) == 2, "Message must have a length of 2"

points = AllPoints()

for k, v in message[0].items():
points.add(name=k, value=v, meta=message[1].get(k))

return points
179 changes: 179 additions & 0 deletions services/core/IEEE_2030_5/ieee_2030_5/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Copyright 2022 Battelle Memorial Institute
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import datetime
import logging
import sys
from pathlib import Path
from pprint import pformat

from ieee_2030_5 import AllPoints
from ieee_2030_5.client import IEEE2030_5_Client

try: # for modular
from volttron import utils
from volttron.client.messaging.health import STATUS_GOOD
from volttron.client.vip.agent import RPC, Agent, Core, PubSub
from volttron.client.vip.agent.subsystems.query import Query
from volttron.utils.commands import vip_main
except ImportError:
from volttron.platform.agent import utils
from volttron.platform.agent.utils import vip_main
from volttron.platform.vip.agent import RPC, Agent, Core, PubSub
from volttron.platform.vip.agent.subsystems.query import Query

# from . import __version__
__version__ = "0.1.0"

# Setup logging so that it runs within the platform
utils.setup_logging()

# The logger for this agent is _log and can be used throughout this file.
_log = logging.getLogger(__name__)


class IEEE_2030_5_Agent(Agent):
"""
IEEE_2030_5_Agent
"""

def __init__(self, config_path: str, **kwargs):
super().__init__(**kwargs)
_log.debug("vip_identity: " + self.core.identity)

config = utils.load_config(config_path)

self._cacertfile = Path(config['cacertfile']).expanduser()
self._keyfile = Path(config['keyfile']).expanduser()
self._certfile = Path(config['certfile']).expanduser()
self._subscriptions = config["subscriptions"]
self._server_hostname = config["server_hostname"]
self._server_ssl_port = config.get("server_ssl_port", 443)
self._server_http_port = config.get("server_http_port", None)
self._default_config = {"subscriptions": self._subscriptions}

self._client = IEEE2030_5_Client(cafile=self._cacertfile,
server_hostname=self._server_hostname,
keyfile=self._keyfile,
certfile=self._certfile,
server_ssl_port=self._server_ssl_port)

# Set a default configuration to ensure that self.configure is called immediately to setup
# the agent.
self.vip.config.set_default("config", self._default_config)
# Hook self.configure up to changes to the configuration file "config".
self.vip.config.subscribe(self.configure,
actions=["NEW", "UPDATE"],
pattern="config")

def configure(self, config_name, action, contents):
"""
Called after the Agent has connected to the message bus. If a configuration exists at startup
this will be called before onstart.
Is called every time the configuration in the store changes.
"""
config = self._default_config.copy()
config.update(contents)

_log.debug("Configuring Agent")

try:
subscriptions = config['subscriptions']
except ValueError as e:
_log.error("ERROR PROCESSING CONFIGURATION: {}".format(e))
return

for sub in self._subscriptions:
_log.info(f"Removing subscription: {sub}")
self.vip.pubsub.unsubscribe(peer="pubsub",
prefix=sub,
callback=self._data_published)

self._subscriptions = subscriptions

for sub in self._subscriptions:
_log.info(f"Subscribing to: {sub}")
self.vip.pubsub.subscribe(peer="pubsub",
prefix=sub,
callback=self._data_published)

def _data_published(self, peer, sender, bus, topic, headers, message):
"""
Callback triggered by the subscription setup using the topic from the agent's config file
"""
points = AllPoints.frombus(message)
_log.debug(points.__dict__)

@Core.receiver("onstart")
def onstart(self, sender, **kwargs):
"""
This is method is called once the Agent has successfully connected to the platform.
This is a good place to setup subscriptions if they are not dynamic or
do any other startup activities that require a connection to the message bus.
Called after any configurations methods that are called at startup.
Usually not needed if using the configuration store.
"""
# Example publish to pubsub
# self.vip.pubsub.publish('pubsub', "some/random/topic", message="HI!")

# Example RPC call
# self.vip.rpc.call("some_agent", "some_method", arg1, arg2)
pass

@Core.receiver("onstop")
def onstop(self, sender, **kwargs):
"""
This method is called when the Agent is about to shutdown, but before it disconnects from
the message bus.
"""
pass

@RPC.export
def rpc_method(self, arg1, arg2, kwarg1=None, kwarg2=None):
"""
RPC method
May be called from another agent via self.vip.rpc.call
"""
return self.setting1 + arg1 - arg2

@PubSub.subscribe('pubsub', '', all_platforms=True)
def on_match(self, peer, sender, bus, topic, headers, message):
"""Use match_all to receive all messages and print them out."""
_log.debug(
"Peer: {0}, Sender: {1}:, Bus: {2}, Topic: {3}, Headers: {4}, "
"Message: \n{5}".format(peer, sender, bus, topic, headers,
pformat(message)))


def main():
"""
Main method called during startup of agent.
:return:
"""
try:
vip_main(IEEE_2030_5_Agent, version=__version__)
except Exception as e:
_log.exception('unhandled exception')


if __name__ == '__main__':
# Entry point for script
try:
sys.exit(main())
except KeyboardInterrupt:
pass
Loading

0 comments on commit 58b6184

Please sign in to comment.