Push feed¶
This page shows how to load curve events from Energy Quantified’s WebSocket API
streamed in real-time. The examples below expect you to have an initialized
instance of the EnergyQuantified
API client called eq
.
Operations described here are available under eq.events.*
.
Terminology¶
Curve events¶
Class reference: eq.events.CurveUpdateEvent
Energy Quantified creates curve events whenever data in a
Curve
changes. Events provide information
about what kind of operation was done (e.g., delete, update) on the curve,
timestamps of the first and last affected values, and the total number of
affected values. It also includes an
Instance
where relevant.
Example: If values were updated at a 15-minute frequency for Germany’s consumption normal at
2023-01-01 01:15
and 2023-01-01 01:45
, the following event would be produced:
<CurveUpdateEvent:
event_id="1234567890123-0",
curve="DE Consumption MWh/h 15min Normal",
event_type=EventType.CURVE_UPDATE,
begin="2023-01-01 01:15",
end="2023-01-01 02:00",
num_values=2>
The event_id
attribute uniquely identifies curve events. Event IDs are strings
of two numbers separated by a dash (“-“). The first number is exactly 13 digits
long and represents the creation timestamp for the event.
The second number is a serial, which increments when multiple events are
within the same millisecond.
Event types¶
All events have an EventType
at the event_type
attribute. Event types describe what an event means.
EventType.CURVE_UPDATE
: Data in a curve is updated
EventType.CURVE_DELETE
: Some data in a curve or a whole instance is removed
EventType.CURVE_TRUNCATE
: All data in a curve is removed
EventType.DISCONNECTED
: Not connected to the push feed
EventType.TIMEOUT
: Filler event enabling users to act between events during quiet times. Timeout events occur when the user provides thetimeout
parameter ineq.events.get_next()
.
Filters¶
To receive curve events, one must subscribe by providing a list of filters. You will receive events matching any of your filters.
A filter matches if all set variables match the event. For instance, a filter with
areas=[Area.DE, Area.FR]
and data_types=[DataType.ACTUAL, DataType.FORECAST]
matches curves for France or Germany, with data type Actual or Forecast.
Due to WebSockets’ bidirectional communication protocol, you can re-subscribe with new filters on the fly while already listening to the stream.
There are two different filter types for curve events:
CurveNameFilter
: Filter by curves/curve names
CurveAttributeFilter
: Filter by curve attributes similar to the curve search
Quickstart¶
First, we must connect to the WebSockets endpoint:
eq.events.connect()
Once connected, we can specify our filters and subscribe to them. In this example, we create filters for Actual and Forecast data in Germany, France or Great Britain:
my_filter = CurveAttributeFilter(
areas=[Area.DE, Area.FR, Area.GB],
data_types=[DataType.ACTUAL, DataType.FORECAST],
)
Subscribe to curve events with the filters:
# Single filter
eq.events.subscribe_curve_events(filters=my_filter)
# Multiple filters
eq.events.subscribe_curve_events(filters=[
my_filter,
another_filter,
third_filter,
])
Then you can loop over incoming events forever:
# Loop over incoming events (blocking)
for event in eq.events.get_next():
if event.event_type == EventType.CURVE_UPDATE:
# A curve is updated, so we can load its data
data = event.load_data()
# Store it in your database?
continue
if event.event_type == EventType.DISCONNECTED:
# Not connected and no more events to process
break
Putting it all together, this is how you connect, subscribe, and start listening for curve events:
import time
from energyquantified import EnergyQuantified
from energyquantified.events import EventType, CurveAttributeFilter
from energyquantified.metadata import Area, DataType
# Initialize the client
eq = EnergyQuantified(api_key="aaaa-bbbb-cccc-dddd")
# Connect to the WebSocket endpoint
eq.events.connect()
# Create filters for ACTUAL and FORECAST events in DE, FR and GB
filters = CurveAttributeFilter(
areas=[Area.DE, Area.FR, Area.GB],
data_types=[DataType.ACTUAL, DataType.FORECAST],
)
# Subscribe to curve events
eq.events.subscribe_curve_events(filters=filters)
# Loop over incoming events (blocking)
for event in eq.events.get_next():
if event.event_type == EventType.CURVE_UPDATE:
# A curve is updated, so we can load its data
print("Curve updated: ", event)
# Load data
data = event.load_data()
# Store it in your database?
continue
if event.event_type == EventType.DISCONNECTED:
# Not connected and no more events
break
Connecting¶
Method reference: eq.events.connect()
Connect to the stream by calling
connect()
.
Note that this temporarily blocks program execution while trying to connect.
eq.events.connect()
The client tries to reconnect on network errors automatically. You can override
the number of reconnect attempts by providing the reconnect_attempts
parameter.
The number of attempts reset once a connection is re-established.
eq.events.connect(reconnect_attempts=5)
Disconnecting¶
Method reference: eq.events.disconnect()
Disconnect from the stream by calling
disconnect()
or
close()
. The
get_next()
method still returns
all events received before disconnecting.
eq.events.disconnect()
Subscribing¶
Method reference: eq.events.subscribe_curve_events()
To receive curve events, one must subscribe by providing a list of filters.
Note that the subscribe method temporarily blocks program execution while waiting
for a response and raises an exception if anything goes wrong.
CurvesSubscribeResponse
is returned from a successful subscribe. The response includes the filters
and event ID (if provided) subscribed with, confirmed by the server.
# Subscribe (raises exception on fail)
response = eq.events.subscribe_curve_events(filters=[...])
# Check the response
print("Subscribed with filters:", response.filters)
if response.last_id is None:
print("Subscribed to new events")
else:
print("Subscribed to events since id:", response.last_id)
You can specify how long to wait for a response by supplying the timeout
parameter with the maximum number of seconds to wait. The default is 30 seconds.
You can update your filters while already subscribed by calling
subscribe_curve_events()
with the new filters.
Providing filters¶
There are two different filter types for curve events:
CurveNameFilter
: Filter by curves/curve names
CurveAttributeFilter
: Filter by curve attributes similar to the curve search
However, both filter types support filtering on event_types
, begin
and end
.
You can subscribe with a combination of both
CurveNameFilter
and
CurveAttributeFilter
.
The maximum number of filters allowed is limited to ten (10). You will receive
events for curves that match any of your filters, (a filter matches if
all set variables match the event).
Subscribe to curve events with one or more filters:
# Single filter
eq.events.subscribe_curve_events(filters=filter_1)
# Multiple filters
eq.events.subscribe_curve_events(filters=[
filter_1,
filter_2,
filter_3,
])
Creating a filter¶
Set filter variables in the constructor or via the set_<variable>()
methods:
from datetime import datetime
from energyquantified.events import CurveAttributeFilter, EventType
from energyquantified.metadata import Area
# Provide values to the Filter constructor
my_filter = CurveAttributeFilter(
event_types=EventType.CURVE_UPDATE,
begin=datetime(2023, 9, 1),
areas=Area.DE,
)
# Provide values via set_*-methods (fluently)
my_filter = (
CurveAttributeFilter()
.set_event_types(EventType.CURVE_UPDATE)
.set_begin(datetime(2023, 9, 1))
.set_areas(Area.DE),
)
Set multiple values by providing a list, either to the constructor or to
each set_<variable>()
method:
from energyquantified.events import CurveAttributeFilter, EventType
from energyquantified.metadata import Area, DataType
# Provide a list of values in the constructor
my_filter = CurveAttributeFilter(
event_types=[EventType.CURVE_UPDATE, EventType.CURVE_DELETE],
areas=[Area.DE, Area.FR],
data_types=[DataType.ACTUAL, DataType.FORECAST],
)
# Provide a list of values via set_*-methods (fluently)
my_filter = (
CurveAttributeFilter()
.set_event_types([EventType.CURVE_UPDATE, EventType.CURVE_DELETE])
.set_areas([Area.DE, Area.FR]),
.set_data_types([DataType.ACTUAL, DataType.FORECAST])
)
You can also provide strings instead of objects:
my_filter = CurveAttributeFilter(
event_types=["CURVE_UPDATE", "CURVE_DELETE"],
areas=["DE", "FR"],
data_types=["ACTUAL", "FORECAST"],
)
Filter specific curves¶
Class reference: energyquantified.events.CurveNameFilter
Use the CurveNameFilter
to
match specific curves by providing a list of Curve
objects or curve names.
Available parameters:
event_types
: Filter byEventType
.
curves
: Filter byCurve
objects or curve names.
tags
: Filter by instance tags (e.g., “ec”, “ec-ens”, “gfs”). Curves without an instance are not affected by this filter.
exclude_tags
: Exclude events for instances with one of the specified instance tags (e.g., “ec”, “ec-ens”, “gfs”). Curves without an instance are not affected by this filter.
begin
: The earliest date to look for changed values (inclusive).
end
: The last date to look for changed values (exclusive).
The code snippet below illustrates creating a filter for updates in a certain
date range for two curves. You will receive a curve event whenever a value
between begin
(inclusive) and end
(exclusive) changes for either of the
curves.
from datetime import date
from energyquantified.events import CurveNameFilter, EventType
# Providing curves by name
my_filter = CurveNameFilter(
event_types=EventType.CURVE_UPDATE,
curves=[
"DE Wind Power Production MWh/h 15min Actual",
"FR Wind Power Production MWh/h 15min Forecast",
],
begin=date(2023, 9, 1),
end=date(2023, 10, 1),
)
Filter by curve attributes¶
Class reference: energyquantified.events.CurveAttributeFilter
Use the CurveAttributeFilter
to filter curve events based on Curve
attributes.
Available parameters:
event_types
: Filter byEventType
.
tags
: Filter by instance tags (e.g., “ec”, “ec-ens”, “gfs”). Curves without an instance are not affected by this filter.
exclude_tags
: Exclude events for instances with one of the specified instance tags (e.g., “ec”, “ec-ens”, “gfs”). Curves without an instance are not affected by this filter.
areas
: Filter byArea
.
data_types
: Filter byDataType
.
commodities
: Filter by commodities.
categories
: Filter by categories.
exact_categories
: Filter by one or more exact categories. An exact category is a string of ordered categories separated by space (e.g.,"Wind Power Production"
).
begin
: The earliest date to look for changed values (inclusive).
end
: The last date to look for changed values (exclusive).
The code snippet below illustrates how to filter curve updates for January 2023
in Actual or Forecast data with the Wind Power Production
category in Germany or France.
from datetime import date
from energyquantified.events import CurveAttributeFilter, EventType
from energyquantified.metadata import Area, DataType
# Filter by curve attributes
my_filter = CurveAttributeFilter(
event_types=EventType.CURVE_UPDATE,
data_types=[DataType.ACTUAL, DataType.FORECAST],
exact_categories="Wind Power Production",
areas=[Area.DE, Area.FR],
begin=date(2023, 1, 1),
end=date(2023, 2, 1),
)
For a curve event to match the above filter, it must meet all of the following requirements:
The event type is
CURVE_UPDATE
The data type is
Actual
orForecast
The exact category is
Wind Power Production
The area is Germany or France
At least one value in January 2023 is updated
Providing last_id
(advanced)¶
Event IDs are strings of two numbers separated by a dash (“-“). The first number
is a timestamp. If you supply the optional parameter last_id
to
subscribe_curve_events()
,
you will receive events created after this ID:
# Subscribe and receive events after provided last_id only
eq.events.subscribe_curve_events(
filters=[...],
last_id="1234567890123-0"
)
This ID takes priority over the recommended last_id_file
approach (further
described in Remember last_id between processes runs).
Requesting active filters¶
Method reference: eq.events.get_curve_filters()
Request the currently active curve event filters from the server.
Note that this method temporarily block program execution while waiting for a
response and raises an exception if anything goes wrong. The returned value
is None if you are not subscribed to curve events, otherwise a list of the
filters (i.e., CurveNameFilter
,
CurveAttributeFilter
) you are subscribed
with:
active_filters = eq.events.get_curve_filters()
if active_filters is None:
print("Not subscribed to curve events!")
else:
print("List of active curve event filters:", active_filters)
You can specify how long to wait for a response by supplying the timeout
parameter with the maximum number of seconds to wait. The default is 30 seconds.
Handling events¶
Method reference: eq.events.get_next()
You loop over incoming events as if you are looping over a list. The loop will wait until new events arrive during quiet times:
for event in eq.events.get_next():
# Handle event
An event can be one of the following classes:
All events have the event_type
attribute. Use it to figure out
how what type of event you receive. See the Event types
section for details.
The different events are described further in this section.
Curve events¶
Class reference: energyquantified.events.CurveUpdateEvent
Whenever a data in a curve is updated or deleted, you will receive a
CurveUpdateEvent
:
for event in eq.events.get_next():
if event.event_type == EventType.CURVE_UPDATE:
# Data is updated
print("UPDATE event:", event)
if event.event_type == EventType.CURVE_DELETE:
# Some data is deleted
print("DELETE event:", event)
if event.event_type == EventType.CURVE_TRUNCATE:
# *All* data in a curve is deleted
print("TRUNCATE event:", event)
When event_type
is EventType.CURVE_UPDATE
,
you can use the
CurveUpdateEvent.load_data()
method to load the modified data. That will load all values between the first and last
modified value, even those that have not changed.
for event in eq.events.get_next():
if event.event_type = EventType.CURVE_UPDATE:
# Data is updated
print("UPDATE event:", event)
# Load the data
data = event.load_data()
# You now have the modified data
print("Updated data:", data)
The data loaded can either be a Timeseries
, a
Periodseries
or an OHLCList
,
depending on the
CurveUpdateEvent.curve
’s
curve type.
Note that you cannot load data for CURVE_DELETE
and CURVE_TRUNCATE
events,
as deleted data no longer exists.
Connection events¶
Class reference:
energyquantified.events.ConnectionEvent
Connection events occur if you are disconnected from the server or did not
connect in the first place. It has the DISCONNECTED
event type. The
event contains the cause for the disconnect.
You will not receive events of this type until after all received curve events are processed.
Capture these events as seen below. In this example, we simply break out of the loop and stop processing events:
for event in eq.events.get_next():
if event.event_type == EventType.DISCONNECTED:
# Not connected and all curve events
# are processed
print("Not connected:", event)
break
Optionally, you can use the disconnected event to reconnect manually. Note that the client will always try to reconnect a couple of times before it gives up and emits this event.
Once reconnected, the client will resubscribe with the previous filters and ask for events missed during downtime.
import time
for event in eq.events.get_next():
if event.event_type == EventType.DISCONNECTED:
# Not connected and event queue is empty
print("Not connected:", event)
# Wait 30 seconds before reconnecting
time.sleep(30)
# Try to reconnect
eq.events.connect()
continue
Note that you also get events of the DISCONNECTED
type if you never
connected in the first place, so it does not necessarily mean that a disconnect
took place.
Timeout events¶
Class reference:
energyquantified.events.TimeoutEvent
eq.events.get_next()
is blocking,
meaning you cannot act while waiting for a new event. Supply the optional timeout
parameter to get_next()
with
the number of seconds you want to wait for new events. You will then receive a
timeout event whenever the set number of seconds passes without any new event.
for event in eq.events.get_next(timeout=10):
if event.event_type == EventType.TIMEOUT:
print("No events in the last 10 seconds")
continue
We designed the timeout event as a filler event that enables users to act in between events during quiet times.
Timeout events can be useful if you intend to execute some code after a certain
time. Setting the timeout interval eliminates the risk of being stuck and unable
to act while waiting for new events due to the blocking nature of
get_next()
.
You can safely ignore this event if you do not find it useful.
Capturing messages and errors¶
By default, the client logs messages from the server at INFO
level. Override
the default by setting a custom callback function with
eq.events.set_message_handler()
.
The callback function takes in one parameter: the server message, which is a string.
def message_handler(message):
print("Message from server:", message)
eq.events.set_message_handler(message_handler)
Similarly, you can override the callback for handling error messages with
eq.events.set_error_handler()
:
def error_message_handler(error):
print("Error occured:", error)
eq.events.set_error_handler(error_message_handler)
You can attach the handlers before you connect:
# Set handlers
eq.events.set_message_handler(message_handler)
eq.events.set_error_handler(error_message_handler)
# Connect
eq.events.connect()
Restarts and network errors¶
Remember last_id
between process runs¶
The client can remember the last event received and continue where it left off on restarts.
Enable this feature by supplying the last_id_file
parameter to
eq.events.connect()
with a
file path. Make sure that you have read and write access to the file path.
eq.events.connect(last_id_file="last_id_file.json")
The client regularly updates the file at a defined interval (~0.5/min), when the connection drops, and when execution of the process terminates for any reason.
The next time you connect to the stream and subscribe to curve events (assuming
the same file path for last_id_file
and that you have not altered the file),
the client will request all events after the last one you received. Note that
providing the last_id
parameter to
eq.events.subscribe_curve_events()
with an event ID overrides the ID from file, and you will get events created
after the provided ID.
Automatic subscribe after reconnect¶
As mentioned, the client silently tries to reconnect on network errors automatically. After successfully reconnecting, it subscribes with the same filters and requests all events that occurred during downtime.
Note that
eq.events.get_next()
raises a
WebSocketsError
if the client fails to automatically resubscribe after a reconnect.
When you are not connected, you will receive events with the
EventType.DISCONNECTED
type
from eq.events.get_next()
.
If you were previously connected and did not close the connection, the
connection dropped and automatic reconnect failed (maximum number of reconnect
attempts exceeded). In that case, you can optionally try manually reconnecting
and subscribing. Remember that
eq.events.connect()
raises
an exception if it fails to initially connect, so it can be wise to wait a short
while before trying to reconnect.
import time
# The filters subscribed with
filters = [...]
eq.events.subscribe_curve_events(filters=filters)
for event in eq.events.get_next():
if event.event_type == EventType.DISCONNECTED:
# Not connected and event queue is empty
print("Not connected:", event)
# Wait 60 seconds before trying to reconnect
time.sleep(60)
# Try to reconnect
eq.events.connect()
# Subscribe with same filters
eq.events.subscribe_curve_events(filters=filters)
continue
Server only keeps the most recent events¶
While the API supports fetching older events, we only keep the latest ~10 000 (at the time of writing). In most cases, that should cover events for the last 10-15 minutes.