[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
[volttron-dev] Volttron 8.1: How to Pause the handle_publish
|
- From: "Thompson, Joe" <jthompson@xxxxxxxx>
- Date: Wed, 31 Jul 2024 19:20:23 +0000
- Accept-language: en-US
- Arc-authentication-results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=epri.com; dmarc=pass action=none header.from=epri.com; dkim=pass header.d=epri.com; arc=none
- Arc-message-signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector10001; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-AntiSpam-MessageData-ChunkCount:X-MS-Exchange-AntiSpam-MessageData-0:X-MS-Exchange-AntiSpam-MessageData-1; bh=0vTat3jfeEANfJXFGVeZ1AUBsNApL9MsibZ/viBNNAc=; b=Ay3i6bjjNReeWMm9CFQbs5PrW7xqBZUrJzPfQ8VJ1ztgWfGeuImX70sAbhmsy9L3Z/Q404uypzGXK6q2NJ97QO45XHxzcymgNhI7s4OOoakhDGKmzvdMNCYKAN0cFCqVvXMBNuMMP6t5n4S9cMsTLnzr6QmdXjz3rX0bkQ5ehPGA9J7WaKPHsOYylEfz4F2Rldfj08w8PNBGKJKz7Ird36PCq+gTbXy4Sz1XpHZVo4r8c8mGZyg2zhRvLsP8TTd5huDderNuYiMp58SUt09RjCpgDj6p/oxncXsJrRQSjoSbXA+u85UG2fStRp/1pcqDe1oXPUWHPR2T9C8vAPaffA==
- Arc-seal: i=1; a=rsa-sha256; s=arcselector10001; d=microsoft.com; cv=none; b=LqolpWjjaWu1imrHO6xTaDogBWZN7fxHg62BIuDrQ5eKz8nhuYAOyIVX5oEx4wIbhyELkFtLp4rMM5mkOzNIAmRgllcJSTgHQLfxeVa369L6S0wUkZrQ/msdp1fMsIN1t0iDSyXVA7VWXiVyFq9aLaj/UF0Gc4J7UgJmnjZaJfG4Fe82KhpdXKxLjxvYRHkmRz+Cu/+EZW5D+LxTurhH1zvFpBywgQjZLEIFnQVpSzJ6jIq7u1D7uNH0ejVgRjMHjOFMODbNikvL/PJF1Arxa5Yo3Jrr2m/+FOJHKtSlTq/ALg7rd/B7ziXnlwXyo34DZhHF1N7q9gqPIIqC94HVxQ==
- Delivered-to: volttron-dev@xxxxxxxxxxx
- List-archive: <https://www.eclipse.org/mailman/private/volttron-dev/>
- List-help: <mailto:volttron-dev-request@eclipse.org?subject=help>
- List-subscribe: <https://www.eclipse.org/mailman/listinfo/volttron-dev>, <mailto:volttron-dev-request@eclipse.org?subject=subscribe>
- List-unsubscribe: <https://www.eclipse.org/mailman/options/volttron-dev>, <mailto:volttron-dev-request@eclipse.org?subject=unsubscribe>
- Thread-index: AQHa436xlgRpsM51fESF2TaH8ejAmQ==
- Thread-topic: Volttron 8.1: How to Pause the handle_publish
Hello Volttron Team,
The Issue:
We are running into a challenge that we need help with. Please see the attached agent.py for reference. We have a little 5 second control loop for orchestrating the operation of an inverter with a battery energy storage system.
Every 5 seconds:
- the platform driver reads from the inverter and published its data
- via a subscription to the inverter’s topics, the handle_publish callback is fired
- our control logic is all contained within handle_publish
- one loop of the controller finished in well under 5 seconds, so the next iteration commences on the next handle_publish call
This has worked well for a long time until recently introducing some more complex controls. Essentially we have added a new control step that is called whenever the controller notrices that the battery’s contactor has opened up. This new
step / procedure, “close_contactor”, is called and is tasked with running through some steps to attempt to close the contactor again so that the controller can continue like normal.
But the “close_contactor” procedure is guaranteed to take around 70 seconds to complete due to some gevent.sleep() calls. During this time, there seems to be many other control threads that pop up every 5 seconds as the handle_publish callback
continues to be called in each subsequent 5 second interval.
Desired Behavior:
We are looking for a way to temporarily pause the execution of subsequent handle_publish callback calls while an existing thread is already running handle_publish. Is there a way to do this in Volttron? Or is our approach to this controller
fundamentally flawed and we will need to refactor our control algorithm?
Thank you for your time!
Joe Thompson
Technical Leader
Electric Power Research Institute
Energy Storage and Distributed Generation
(912) 663-3407
*** This email message is for the sole use of the intended recipient(s) and may contain information that is confidential, privileged or exempt from disclosure under applicable law. Unless otherwise expressed in this message by the sender or except as may be
allowed by separate written agreement between EPRI and recipient or recipient’s employer, any review, use, distribution or disclosure by others of this message is prohibited and this message is not intended to be an electronic signature, instrument or anything
that may form a legally binding agreement with EPRI. If you are not the intended recipient, please contact the sender by reply email and permanently delete all copies of this message. Please be advised that the message and its contents may be disclosed, accessed
and reviewed by the sender's email system administrator and/or provider. ***
|
"""
Agent documentation goes here.
"""
__docformat__ = 'reStructuredText'
import logging
import sys
from volttron.platform.agent import utils
from volttron.platform.vip.agent import Agent, Core, RPC
from volttron.platform.agent.utils import format_timestamp, get_aware_utc_now
import gevent
import pandas as pd
_log = logging.getLogger(__name__)
utils.setup_logging()
__version__ = "0.1"
def enervenue_ctrl(config_path, **kwargs):
"""
Parses the Agent configuration and returns an instance of
the agent created using that configuration.
:param config_path: Path to a configuration file.
:type config_path: str
:returns: EnervenueCtrl
:rtype: EnervenueCtrl
"""
try:
config = utils.load_config(config_path) # config/enervenue_ctrl/config
except Exception:
config = {}
if not config:
_log.info("Using Agent defaults for starting configuration.")
sched_path = config['control_schedule_path']
system_strings = config['system_strings']
return EnervenueCtrl(sched_path, system_strings, config_path, **kwargs)
class EnervenueCtrl(Agent):
"""
Document agent constructor here.
"""
def __init__(self, sched_path, system_strings, config_path, **kwargs):
super(EnervenueCtrl, self).__init__(**kwargs)
_log.debug("vip_identity: " + self.core.identity)
self.config_path = config_path
self.sched_path = sched_path
self.bms_write_device = 'STAC/EnervenueBisonBMS'
self.inverter_write_device = 'STAC/DynapowerInverter'
self.inverter_rated_kw = 124.7
self.system_strings = system_strings # use the config file to adjust this number if the system needs to operate with only 1 or 2 strings instead of 3
# Configure the actual modbus writes and format the write request
self.dynapower_control = {
'Fault Reset': 0, # 0: No Action 1: Reset All Latched Alarms/Faults
'Operation Mode Select': 3, # 1: Idle State 3: Grid-Tied Mode
'Output Power Command': 0, # Percentage of rated power (124.7kW), negative = charge
'PCS Set Operation': 0, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
}
self.default_config = {
"control_schedule_path": sched_path,
"system_strings": system_strings,
}
# Set a default configuration to ensure that self.configure is called immediately to setup
# the agent.
self.vip.config.set_default("test_config", self.default_config)
# Hook self.configure up to changes to the configuration file "config".
self.vip.config.subscribe(self.configure, actions=["NEW", "UPDATE"], pattern="test_config")
def configure(self, config_name, action, contents):
"""
Called after the Agent has connected to the message bus. If a configuration exists at startup
this will be called before onstart.
Is called every time the configuration in the store changes.
"""
_log.debug("Configuring Agent")
config = self.default_config.copy()
config.update(contents)
try:
sched_path = config["control_schedule_path"]
system_strings = config['system_strings']
except ValueError as e:
_log.error("ERROR PROCESSING CONFIGURATION: {}".format(e))
return
self.sched_path = sched_path
self.system_topic = "devices/STAC/DynapowerInverter"
self.sched = pd.read_excel(self.sched_path)
self.system_strings = system_strings
self.sched_step_ind = 0
self.parse_control_step()
self.timed_step_end_utc = None if pd.isna(self.duration) else pd.Timestamp.utcnow() + pd.Timedelta(seconds=self.duration)
### SUBSCRIBE TO BESS AND INVERTER POINTS. GO THROUGH CONTROL LOOP AT EACH READ
def _create_subscriptions(self, topic):
"""
Unsubscribe from all pub/sub topics and create a subscription to a topic in the configuration which triggers
the handle_publish callback
"""
self.vip.pubsub.unsubscribe("pubsub", None, None)
# subscribe to all read data from the BMS
self.vip.pubsub.subscribe(peer='pubsub',
prefix=topic,
callback=self.handle_publish)
def get_number_connected_strings(self):
relay_status_topics = [self.bms_write_device + "/" + 'Active String Count']
point_results = self.vip.rpc.call('platform.actuator', 'get_multiple_points', relay_status_topics).get(timeout=1)
self.connected_strings = sum([point_results[0][s] for s in relay_status_topics])
def parse_control_step(self):
# get details for the current control step
self.ctrl_step = self.sched.loc[self.sched_step_ind]
self.step_name = self.ctrl_step['Step']
self.target_soc = self.ctrl_step['Target SoC (%)']
self.sched_power_cmd = self.ctrl_step['Power (kW)\n+ = discharge']
self.duration = self.ctrl_step['Duration (s)']
self.action = 'discharge' if self.sched_power_cmd > 0 else 'charge' if self.sched_power_cmd < 0 else 'idle'
self.get_number_connected_strings()
self.power_command = self.sched_power_cmd * (self.connected_strings / self.system_strings)
self.power_command_pct = (self.power_command / self.inverter_rated_kw) * 100 # remove force to int() to write fractional values
def handle_publish(self, peer, sender, bus, topic, headers, message):
"""
Callback triggered by the subscription setup using the topic from the agent's config file
For now we can write out all the control logic here
"""
# current_soc = float(message[0]['EnerStation SOC'])
# read current SoC from battery system
current_soc = self.vip.rpc.call(
'platform.driver',
'get_point',
path=self.bms_write_device,
point_name='String 1 SOC'
).get(timeout=1)
# check how many strings are connected, attempt to close contactor
self.get_number_connected_strings()
if self.connected_strings != self.system_strings: # if self.connected_strings == 0:
#_log.info("No Strings are connected. Stopping the inverter.")
_log.info("A string has disconnected. Attempting to reclose contactor.")
self.close_contactor()
# derate total power based on number of strings available
self.power_command = self.sched_power_cmd * (self.connected_strings / self.system_strings)
# derate at low soc to stay under discharge current limit
string_voltage = self.vip.rpc.call(
'platform.driver',
'get_point',
path=self.bms_write_device,
point_name='String 1 String Voltage'
).get(timeout=1)
_log.info("String is at %d Volts." % string_voltage)
if ((1000*self.power_command)/string_voltage >= 77):
self.power_command = (77 * string_voltage)/1000
self.power_command_pct = (self.power_command / self.inverter_rated_kw) * 100 # remove force to int() to write fractional values
## adjust the power request given the number of strings online. Right now String 1 is disabled, so only 2 strings are available.
# for now change this to completely stop the controller if any string is not connected
# power_command = self.power_cmd # COMMENT THIS OUT WHEN STRINGS ARE BACK ONLINE
# if the current step name is not an integer then send an idle command and stop the agent
try:
int(self.step_name)
except ValueError as e:
_log.info("Schedule was completed. Stopping the inverter.")
self.core.stop()
# Schedule was completed, do not proceed with the rest of control logic
# need to find a way to shutdown the agent here
return
# check whether we are in a timed / duration step
if self.timed_step_end_utc is not None: # we are within a timed step
if pd.Timestamp.utcnow() < self.timed_step_end_utc:
# send write command that corresponds to this step
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': int(self.power_command_pct * 10), # Percentage of rated power (124.7kW) with 10^-1 precision (e.g. 96.5% power command should be sent as 965). negative = charge
}
)
# do not proceed with the rest of control logic
# wait for next read from battery
return
else: # enough time has passed move on to the next step
self.timed_step_end_utc = None
self.sched_step_ind += 1
self.parse_control_step()
if pd.isna(self.duration): # then this is a power / SoC command
if (self.action == 'charge') & (current_soc < self.target_soc):
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': int(self.power_command_pct * 10), # Percentage of rated power (124.7kW) with 10^-1 precision (e.g. 96.5% power command should be sent as 965). negative = charge
}
)
# _log.info("Sent charge command of %0.1fkW" % self.power_command)
elif (self.action == 'discharge') & (current_soc > self.target_soc):
# _log.info("Sending discharge command of %0.1fkW" % self.power_command)
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': int(self.power_command_pct * 10), # Percentage of rated power (124.7kW) with 10^-1 precision (e.g. 96.5% power command should be sent as 965). negative = charge
}
)
else: # move on to the next step and execute that step
if self.step_name != "stop":
self.sched_step_ind += 1
self.parse_control_step()
self.timed_step_end_utc = None if pd.isna(self.duration) else pd.Timestamp.utcnow() + pd.Timedelta(seconds=self.duration)
_log.info("Moving on to next step with power of %0.1fkW" % self.power_command)
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': int(self.power_command_pct * 10), # Percentage of rated power (124.7kW) with 10^-1 precision (e.g. 96.5% power command should be sent as 965). negative = charge
}
)
else: # this must be a duration command
self.timed_step_end_utc = None if pd.isna(self.duration) else pd.Timestamp.utcnow() + pd.Timedelta(seconds=self.duration)
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': int(self.power_command_pct * 10), # Percentage of rated power (124.7kW) with 10^-1 precision (e.g. 96.5% power command should be sent as 965). negative = charge
}
)
def write_to_device(self, device, write_values):
"""
Write some points to a device
"""
# Create a start and end timestep to serve as the times we reserve to communicate with the device
_now = get_aware_utc_now()
str_now = format_timestamp(_now)
_end = _now + pd.Timedelta(seconds=5)
str_end = format_timestamp(_end)
# Wrap the timestamps and device topic (used by the Actuator to identify the device) into an actuator request
schedule_request = [[device, str_now, str_end]]
# Use a remote procedure call to ask the actuator to schedule us some time on the device
# _log.info("schedule_request {}".format(schedule_request))
result = self.vip.rpc.call(
'platform.actuator', 'request_new_schedule', self.core.identity, 'my_test', 'HIGH', schedule_request).get(
timeout=1)
# start by creating our topic_values
topic_values = []
for point, value in write_values.items():
# create a (topic, value) tuple and add it to our topic values
topic_values.append((device + '/' + point, value))
# Now use another RPC call to ask the actuator to set the point during the scheduled time
_log.info("Writing Modbus Points: {}".format(topic_values))
result = self.vip.rpc.call(
'platform.actuator', 'set_multiple_points', self.core.identity, topic_values).get(
timeout=1)
# _log.info("Modbus write response: {}".format(result))
def close_contactor(self): #, device, write_values):
"""
Close Contactors
Address any scenarious which caused them to open
"""
# 1 - stop method to change state of inverter and set 0 power command
_log.info("Starting close_contactor. Stopping inverter")
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': 0, # Percentage of rated power (124.7kW), negative = charge
'PCS Set Operation': 2, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
}
)
_log.info("Inverter stop message sent.")
# 2 - loop to track inverter voltage, tries to close contactor, write to log, wait minute-ish (while loop based on contactor closed)
_log.info("Starting loop to clear faults and close contactor.")
attempts = 0 #timeout tracker initialization
self.get_number_connected_strings()
while self.connected_strings != 1:
if attempts >= 15: #time out after 15 attempts
_log.info("Maximum attempts to close contactor reached. Stopping the controller.")
self.core.stop()
#2a - read voltage of inverter TODO
#2b - write to log stating contactor status and inverter voltage
_log.info("%d string(s) connected. Invertor voltage is [add]" % self.connected_strings)
_log.info("%d attempt(s) to close" % attempts)
#2c - wait minute
_log.info("sleeping 60 seconds.")
gevent.sleep(60)
_log.info("Attempting to clear faults.")
#2d - clear fault, sleep(5) and attempt to close contactor, sleep(5), read contactor
self.write_to_device(
device="STAC/EnervenueContactor",
write_values={
'Clear Faults': 11, # 11: Clear Alarms
}
)
_log.info("sleeping 5 seconds.")
gevent.sleep(5)
_log.info("Attempting to ckise contactor.")
self.write_to_device(
device="STAC/EnervenueContactor",
write_values={
'EnerStation All Contactors': 1, # 1:close 2: open
}
)
_log.info("sleeping 5 seconds.")
gevent.sleep(5)
attempts += 1
_log.info("Getting number of connected strings.")
self.get_number_connected_strings()
_log.info("Checking if contactor is closed.")
_log.info("Contactor was successfully closed. Moving on to get the inverter back into operating condition.")
# 3 - get inverter into running pq state TODO and time out
_log.info("Restarting the inverter. Waiting for the inverter to enter '11: Running PQ' state before continuing.")
inverter_state = self.get_inverter_state()
while inverter_state != 11:
_log.info("Inverter state is currently %d. Attempting to restart the inverter." % inverter_state)
self.write_to_device(
device=self.inverter_write_device,
write_values={
'PCS Set Operation': 1, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
}
)
gevent.sleep(3)
inverter_state = self.get_inverter_state()
_log.info("All strings are connected and inverter is ready to receive commands.")
_log.info("Moving to next step and continuing schedule.")
self.sched_step_ind += 1
self.parse_control_step()
return
# #-------------------------------------------------------------
# #TODO add fault check
# #specifically pre charge error, bus bar low voltage (contactor open inverter continued to pull charge)
# #possibly code in the inverter reset and wait time
# self.get_number_connected_strings()
# if self.connected_strings != self.system_strings:
# self.write_to_device(
# device="STAC/EnervenueContactor",
# write_values={
# 'EnerStation All Contactors': 1, # 1:close 2: open
# }
# )
# gevent.sleep(5)
# self.get_number_connected_strings()
# if self.connected_strings != self.system_strings:
# #check and clear faults reclose contactor
# #TODO add fault check and clear
# self.write_to_device(
# device="STAC/EnervenueContactor",
# write_values={
# 'EnerStation All Contactors': 1, # 1:close 2: open
# }
# )
# gevent.sleep(5)
# self.get_number_connected_strings()
# if self.connected_strings != self.system_strings:
# self.write_to_device(
# device="STAC/EnervenueContactor",
# write_values={
# 'EnerStation All Contactors': 1, # 1:close 2: open
# }
# )
# gevent.sleep(5)
# self.get_number_connected_strings()
# if self.connected_strings != self.system_strings:
# _log.info("Attempts to Close Contactor Failed, Stopping Controller")
# self.core.stop()
# _log.info("Setting inverter operating mode to 'Grid Tied Mode'.") #not including as i haven't replaced the onstart block
# self.write_to_device( #doesn't hurt to have it check?
# device=self.inverter_write_device,
# write_values={
# 'Operation Mode Select': 3, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
# }
# )
def get_inverter_state(self):
inverter_state = self.vip.rpc.call(
'platform.driver',
'get_point',
path=self.inverter_write_device,
point_name='Vendor Operating States'
).get(timeout=1)
return inverter_state
@Core.receiver("onstart")
def onstart(self, sender, **kwargs):
"""
This is method is called once the Agent has successfully connected to the platform.
This is a good place to setup subscriptions if they are not dynamic or
do any other startup activities that require a connection to the message bus.
Called after any configurations methods that are called at startup.
Usually not needed if using the configuration store.
"""
_log.debug("Importing config details.")
config = utils.load_config(self.config_path) # config/enervenue_ctrl/config
try:
sched_path = config["control_schedule_path"]
system_strings = config['system_strings']
except ValueError as e:
_log.error("ERROR PROCESSING CONFIGURATION: {}".format(e))
return
self.sched_path = sched_path
self.system_topic = "devices/STAC/DynapowerInverter"
self.sched = pd.read_excel(self.sched_path)
self.system_strings = system_strings
# Example publish to pubsub
# self.vip.pubsub.publish('pubsub', "some/random/topic", message="HI!")
#close contactor
self.write_to_device( #TODO change to close contactor function
device="STAC/EnervenueContactor",
write_values={
'EnerStation All Contactors': 1, # 1:close 2: open
}
)
gevent.sleep(10)
#voltage reading test block
string_voltage = self.vip.rpc.call(
'platform.driver',
'get_point',
path=self.bms_write_device,
point_name='String 1 String Voltage'
).get(timeout=1)
_log.info("String is at %d Volts." % string_voltage)
#check if contactor close was successful
_log.info("Running start up sequence for system. Checking if all strings are connected.")
self.get_number_connected_strings()
if self.connected_strings != self.system_strings:
_log.info("All %d strings are not connected. Try closing the contactors before proceeding. Shutting down the controller." % self.system_strings)
self.core.stop()
_log.info("All strings are connected.")
_log.info("Setting inverter operating mode to 'Grid Tied Mode'.")
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Operation Mode Select': 3, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
}
)
_log.info("Starting the inverter. Waiting for the inverter to enter '11: Running PQ' state before continuing.")
inverter_state = self.get_inverter_state()
while inverter_state != 11:
_log.info("Inverter state is currently %d. Attempting to start the inverter." % inverter_state)
self.write_to_device(
device=self.inverter_write_device,
write_values={
'PCS Set Operation': 1, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
}
)
gevent.sleep(3)
inverter_state = self.get_inverter_state()
_log.info("All strings are connected and inverter is ready to receive commands.")
_log.info("Setting up subscription to modbus reads from the inverter.")
self._create_subscriptions(self.system_topic)
_log.info("Schedule Controller started!")
# Example RPC call
# self.vip.rpc.call("some_agent", "some_method", arg1, arg2)
pass
@Core.receiver("onstop")
def onstop(self, sender, **kwargs):
"""
This method is called when the Agent is about to shutdown, but before it disconnects from
the message bus.
"""
# send a 0 power setpoint command and stop inverter operation
self.write_to_device(
device=self.inverter_write_device,
write_values={
'Output Power Command': 0, # Percentage of rated power (124.7kW), negative = charge
'PCS Set Operation': 2, # 0: No Action 1: Start PCS Operation 2: Stop PCS Operation
}
)
pass
@RPC.export
def rpc_method(self, arg1, arg2, kwarg1=None, kwarg2=None):
"""
RPC method
May be called from another agent via self.core.rpc.call
"""
return self.setting1 + arg1 - arg2
def main():
"""Main method called to start the agent."""
utils.vip_main(enervenue_ctrl,
version=__version__)
if __name__ == '__main__':
# Entry point for script
try:
sys.exit(main())
except KeyboardInterrupt:
pass