Push feed

This page shows how to load curve events from Energy Quantified’s WebSocket API streamed in real-time. The examples below expect you to have an initialized instance of the EnergyQuantified API client called eq.

Operations described here are available under eq.events.*.

Terminology

Curve events

Class reference: eq.events.CurveUpdateEvent

Energy Quantified creates curve events whenever data in a Curve changes. Events provide information about what kind of operation was done (e.g., delete, update) on the curve, timestamps of the first and last affected values, and the total number of affected values. It also includes an Instance where relevant.

Example: If values were updated at a 15-minute frequency for Germany’s consumption normal at 2023-01-01 01:15 and 2023-01-01 01:45, the following event would be produced:

<CurveUpdateEvent:
    event_id="1234567890123-0",
    curve="DE Consumption MWh/h 15min Normal",
    event_type=EventType.CURVE_UPDATE,
    begin="2023-01-01 01:15",
    end="2023-01-01 02:00",
    num_values=2>

The event_id attribute uniquely identifies curve events. Event IDs are strings of two numbers separated by a dash (“-“). The first number is exactly 13 digits long and represents the creation timestamp for the event. The second number is a serial, which increments when multiple events are within the same millisecond.

Event types

All events have an EventType at the event_type attribute. Event types describe what an event means.

Filters

To receive curve events, one must subscribe by providing a list of filters. You will receive events matching any of your filters.

A filter matches if all set variables match the event. For instance, a filter with areas=[Area.DE, Area.FR] and data_types=[DataType.ACTUAL, DataType.FORECAST] matches curves for France or Germany, with data type Actual or Forecast.

Due to WebSockets’ bidirectional communication protocol, you can re-subscribe with new filters on the fly while already listening to the stream.

There are two different filter types for curve events:

Quickstart

First, we must connect to the WebSockets endpoint:

eq.events.connect()

Once connected, we can specify our filters and subscribe to them. In this example, we create filters for Actual and Forecast data in Germany, France or Great Britain:

my_filter = CurveAttributeFilter(
    areas=[Area.DE, Area.FR, Area.GB],
    data_types=[DataType.ACTUAL, DataType.FORECAST],
)

Subscribe to curve events with the filters:

# Single filter
eq.events.subscribe_curve_events(filters=my_filter)

# Multiple filters
eq.events.subscribe_curve_events(filters=[
    my_filter,
    another_filter,
    third_filter,
])

Then you can loop over incoming events forever:

# Loop over incoming events (blocking)
for event in eq.events.get_next():

    if event.event_type == EventType.CURVE_UPDATE:
        # A curve is updated, so we can load its data
        data = event.load_data()
        # Store it in your database?
        continue

    if event.event_type == EventType.DISCONNECTED:
        # Not connected and no more events to process
        break

Putting it all together, this is how you connect, subscribe, and start listening for curve events:

import time
from energyquantified import EnergyQuantified
from energyquantified.events import EventType, CurveAttributeFilter
from energyquantified.metadata import Area, DataType

# Initialize the client
eq = EnergyQuantified(api_key="aaaa-bbbb-cccc-dddd")

# Connect to the WebSocket endpoint
eq.events.connect()

# Create filters for ACTUAL and FORECAST events in DE, FR and GB
filters = CurveAttributeFilter(
    areas=[Area.DE, Area.FR, Area.GB],
    data_types=[DataType.ACTUAL, DataType.FORECAST],
)

# Subscribe to curve events
eq.events.subscribe_curve_events(filters=filters)

# Loop over incoming events (blocking)
for event in eq.events.get_next():

    if event.event_type == EventType.CURVE_UPDATE:
        # A curve is updated, so we can load its data
        print("Curve updated: ", event)
        # Load data
        data = event.load_data()
        # Store it in your database?
        continue

    if event.event_type == EventType.DISCONNECTED:
        # Not connected and no more events
        break

Connecting

Method reference: eq.events.connect()

Connect to the stream by calling connect(). Note that this temporarily blocks program execution while trying to connect.

eq.events.connect()

The client tries to reconnect on network errors automatically. You can override the number of reconnect attempts by providing the reconnect_attempts parameter. The number of attempts reset once a connection is re-established.

eq.events.connect(reconnect_attempts=5)

Disconnecting

Method reference: eq.events.disconnect()

Disconnect from the stream by calling disconnect() or close(). The get_next() method still returns all events received before disconnecting.

eq.events.disconnect()

Subscribing

Method reference: eq.events.subscribe_curve_events()

To receive curve events, one must subscribe by providing a list of filters.

Note that the subscribe method temporarily blocks program execution while waiting for a response and raises an exception if anything goes wrong. CurvesSubscribeResponse is returned from a successful subscribe. The response includes the filters and event ID (if provided) subscribed with, confirmed by the server.

# Subscribe (raises exception on fail)
response = eq.events.subscribe_curve_events(filters=[...])

# Check the response
print("Subscribed with filters:", response.filters)
if response.last_id is None:
    print("Subscribed to new events")
else:
    print("Subscribed to events since id:", response.last_id)

You can specify how long to wait for a response by supplying the timeout parameter with the maximum number of seconds to wait. The default is 30 seconds.

You can update your filters while already subscribed by calling subscribe_curve_events() with the new filters.

Providing filters

There are two different filter types for curve events:

However, both filter types support filtering on event_types, begin and end.

You can subscribe with a combination of both CurveNameFilter and CurveAttributeFilter. The maximum number of filters allowed is limited to ten (10). You will receive events for curves that match any of your filters, (a filter matches if all set variables match the event).

Subscribe to curve events with one or more filters:

# Single filter
eq.events.subscribe_curve_events(filters=filter_1)
# Multiple filters
eq.events.subscribe_curve_events(filters=[
    filter_1,
    filter_2,
    filter_3,
])

Creating a filter

Set filter variables in the constructor or via the set_<variable>() methods:

from datetime import datetime
from energyquantified.events import CurveAttributeFilter, EventType
from energyquantified.metadata import Area

# Provide values to the Filter constructor
my_filter = CurveAttributeFilter(
    event_types=EventType.CURVE_UPDATE,
    begin=datetime(2023, 9, 1),
    areas=Area.DE,
)

# Provide values via set_*-methods (fluently)
my_filter = (
    CurveAttributeFilter()
    .set_event_types(EventType.CURVE_UPDATE)
    .set_begin(datetime(2023, 9, 1))
    .set_areas(Area.DE),
)

Set multiple values by providing a list, either to the constructor or to each set_<variable>() method:

from energyquantified.events import CurveAttributeFilter, EventType
from energyquantified.metadata import Area, DataType

# Provide a list of values in the constructor
my_filter = CurveAttributeFilter(
    event_types=[EventType.CURVE_UPDATE, EventType.CURVE_DELETE],
    areas=[Area.DE, Area.FR],
    data_types=[DataType.ACTUAL, DataType.FORECAST],
)

# Provide a list of values via set_*-methods (fluently)
my_filter = (
    CurveAttributeFilter()
    .set_event_types([EventType.CURVE_UPDATE, EventType.CURVE_DELETE])
    .set_areas([Area.DE, Area.FR]),
    .set_data_types([DataType.ACTUAL, DataType.FORECAST])
)

You can also provide strings instead of objects:

my_filter = CurveAttributeFilter(
    event_types=["CURVE_UPDATE", "CURVE_DELETE"],
    areas=["DE", "FR"],
    data_types=["ACTUAL", "FORECAST"],
)

Filter specific curves

Class reference: energyquantified.events.CurveNameFilter

Use the CurveNameFilter to match specific curves by providing a list of Curve objects or curve names.

Available parameters:

  • event_types: Filter by EventType.

  • curves: Filter by Curve objects or curve names.

  • begin: The earliest date to look for changed values (inclusive).

  • end: The last date to look for changed values (exclusive).

The code snippet below illustrates creating a filter for updates in a certain date range for two curves. You will receive a curve event whenever a value between begin (inclusive) and end (exclusive) changes for either of the curves.

from datetime import date
from energyquantified.events import CurveNameFilter, EventType

# Providing curves by name
my_filter = CurveNameFilter(
    event_types=EventType.CURVE_UPDATE,
    curves=[
        "DE Wind Power Production MWh/h 15min Actual",
        "FR Wind Power Production MWh/h 15min Forecast",
    ],
    begin=date(2023, 9, 1),
    end=date(2023, 10, 1),
)

Filter by curve attributes

Class reference: energyquantified.events.CurveAttributeFilter

Use the CurveAttributeFilter to filter curve events based on Curve attributes.

Available parameters:

  • event_types: Filter by EventType.

  • areas: Filter by Area.

  • data_types: Filter by DataType.

  • commodities: Filter by commodities.

  • categories: Filter by categories.

  • exact_categories: Filter by one or more exact categories. An exact category is a string of ordered categories separated by space (e.g., "Wind Power Production").

  • begin: The earliest date to look for changed values (inclusive).

  • end: The last date to look for changed values (exclusive).

The code snippet below illustrates how to filter curve updates for January 2023 in Actual or Forecast data with the Wind Power Production category in Germany or France.

from datetime import date
from energyquantified.events import CurveAttributeFilter, EventType
from energyquantified.metadata import Area, DataType

# Filter by curve attributes
my_filter = CurveAttributeFilter(
    event_types=EventType.CURVE_UPDATE,
    data_types=[DataType.ACTUAL, DataType.FORECAST],
    exact_categories="Wind Power Production",
    areas=[Area.DE, Area.FR],
    begin=date(2023, 1, 1),
    end=date(2023, 2, 1),
)

For a curve event to match the above filter, it must meet all of the following requirements:

  • The event type is CURVE_UPDATE

  • The data type is Actual or Forecast

  • The exact category is Wind Power Production

  • The area is Germany or France

  • At least one value in January 2023 is updated

Providing last_id (advanced)

Event IDs are strings of two numbers separated by a dash (“-“). The first number is a timestamp. If you supply the optional parameter last_id to subscribe_curve_events(), you will receive events created after this ID:

# Subscribe and receive events after provided last_id only
eq.events.subscribe_curve_events(
    filters=[...],
    last_id="1234567890123-0"
)

This ID takes priority over the recommended last_id_file approach (further described in Remember last_id between processes runs).

Requesting active filters

Method reference: eq.events.get_curve_filters()

Request the currently active curve event filters from the server.

Note that this method temporarily block program execution while waiting for a response and raises an exception if anything goes wrong. The returned value is None if you are not subscribed to curve events, otherwise a list of the filters (i.e., CurveNameFilter, CurveAttributeFilter) you are subscribed with:

active_filters = eq.events.get_curve_filters()

if active_filters is None:
    print("Not subscribed to curve events!")
else:
    print("List of active curve event filters:", active_filters)

You can specify how long to wait for a response by supplying the timeout parameter with the maximum number of seconds to wait. The default is 30 seconds.

Handling events

Method reference: eq.events.get_next()

You loop over incoming events as if you are looping over a list. The loop will wait until new events arrive during quiet times:

for event in eq.events.get_next():
    # Handle event

An event can be one of the following classes:

All events have the event_type attribute. Use it to figure out how what type of event you receive. See the Event types section for details.

The different events are described further in this section.

Curve events

Class reference: energyquantified.events.CurveUpdateEvent

Whenever a data in a curve is updated or deleted, you will receive a CurveUpdateEvent:

for event in eq.events.get_next():
    if event.event_type == EventType.CURVE_UPDATE:
        # Data is updated
        print("UPDATE event:", event)
    if event.event_type == EventType.CURVE_DELETE:
        # Some data is deleted
        print("DELETE event:", event)
    if event.event_type == EventType.CURVE_TRUNCATE:
        # *All* data in a curve is deleted
        print("TRUNCATE event:", event)

When event_type is EventType.CURVE_UPDATE, you can use the CurveUpdateEvent.load_data() method to load the modified data. That will load all values between the first and last modified value, even those that have not changed.

for event in eq.events.get_next():
    if event.event_type = EventType.CURVE_UPDATE:
        # Data is updated
        print("UPDATE event:", event)
        # Load the data
        data = event.load_data()
        # You now have the modified data
        print("Updated data:", data)

The data loaded can either be a Timeseries, a Periodseries or an OHLCList, depending on the CurveUpdateEvent.curve’s curve type.

Note that you cannot load data for CURVE_DELETE and CURVE_TRUNCATE events, as deleted data no longer exists.

Connection events

Class reference: energyquantified.events.ConnectionEvent

Connection events occur if you are disconnected from the server or did not connect in the first place. It has the DISCONNECTED event type. The event contains the cause for the disconnect.

You will not receive events of this type until after all received curve events are processed.

Capture these events as seen below. In this example, we simply break out of the loop and stop processing events:

for event in eq.events.get_next():
    if event.event_type == EventType.DISCONNECTED:
        # Not connected and all curve events
        # are processed
        print("Not connected:", event)
        break

Optionally, you can use the disconnected event to reconnect manually. Note that the client will always try to reconnect a couple of times before it gives up and emits this event.

Once reconnected, the client will resubscribe with the previous filters and ask for events missed during downtime.

import time

for event in eq.events.get_next():
    if event.event_type == EventType.DISCONNECTED:
        # Not connected and event queue is empty
        print("Not connected:", event)
        # Wait 30 seconds before reconnecting
        time.sleep(30)
        # Try to reconnect
        eq.events.connect()
        continue

Note that you also get events of the DISCONNECTED type if you never connected in the first place, so it does not necessarily mean that a disconnect took place.

Timeout events

Class reference: energyquantified.events.TimeoutEvent

eq.events.get_next() is blocking, meaning you cannot act while waiting for a new event. Supply the optional timeout parameter to get_next() with the number of seconds you want to wait for new events. You will then receive a timeout event whenever the set number of seconds passes without any new event.

for event in eq.events.get_next(timeout=10):
    if event.event_type == EventType.TIMEOUT:
        print("No events in the last 10 seconds")
        continue

We designed the timeout event as a filler event that enables users to act in between events during quiet times.

Timeout events can be useful if you intend to execute some code after a certain time. Setting the timeout interval eliminates the risk of being stuck and unable to act while waiting for new events due to the blocking nature of get_next().

You can safely ignore this event if you do not find it useful.

Capturing messages and errors

By default, the client logs messages from the server at INFO level. Override the default by setting a custom callback function with eq.events.set_message_handler(). The callback function takes in one parameter: the server message, which is a string.

def message_handler(message):
    print("Message from server:", message)

eq.events.set_message_handler(message_handler)

Similarly, you can override the callback for handling error messages with eq.events.set_error_handler():

def error_message_handler(error):
    print("Error occured:", error)

eq.events.set_error_handler(error_message_handler)

You can attach the handlers before you connect:

# Set handlers
eq.events.set_message_handler(message_handler)
eq.events.set_error_handler(error_message_handler)
# Connect
eq.events.connect()

Restarts and network errors

Remember last_id between process runs

The client can remember the last event received and continue where it left off on restarts.

Enable this feature by supplying the last_id_file parameter to eq.events.connect() with a file path. Make sure that you have read and write access to the file path.

eq.events.connect(last_id_file="last_id_file.json")

The client regularly updates the file at a defined interval (~0.5/min), when the connection drops, and when execution of the process terminates for any reason.

The next time you connect to the stream and subscribe to curve events (assuming the same file path for last_id_file and that you have not altered the file), the client will request all events after the last one you received. Note that providing the last_id parameter to eq.events.subscribe_curve_events() with an event ID overrides the ID from file, and you will get events created after the provided ID.

Automatic subscribe after reconnect

As mentioned, the client silently tries to reconnect on network errors automatically. After successfully reconnecting, it subscribes with the same filters and requests all events that occurred during downtime.

Note that eq.events.get_next() raises a WebSocketsError if the client fails to automatically resubscribe after a reconnect.

When you are not connected, you will receive events with the EventType.DISCONNECTED type from eq.events.get_next(). If you were previously connected and did not close the connection, the connection dropped and automatic reconnect failed (maximum number of reconnect attempts exceeded). In that case, you can optionally try manually reconnecting and subscribing. Remember that eq.events.connect() raises an exception if it fails to initially connect, so it can be wise to wait a short while before trying to reconnect.

import time

# The filters subscribed with
filters = [...]
eq.events.subscribe_curve_events(filters=filters)

for event in eq.events.get_next():
    if event.event_type == EventType.DISCONNECTED:
        # Not connected and event queue is empty
        print("Not connected:", event)
        # Wait 60 seconds before trying to reconnect
        time.sleep(60)
        # Try to reconnect
        eq.events.connect()
        # Subscribe with same filters
        eq.events.subscribe_curve_events(filters=filters)
        continue

Server only keeps the most recent events

While the API supports fetching older events, we only keep the latest ~10 000 (at the time of writing). In most cases, that should cover events for the last 10-15 minutes.