Stream Data
Use the Python client to stream data from a Synnax cluster.
Streaming data is useful for real-time processing, visualization, and monitoring. This page guides you through using the Python client to stream data from a Synnax cluster. If you’d like a conceptual overview of how streaming works in Synnax, check out the streams page.
Opening a Streamer
To start streaming data, call the open_streamer
method on the client and
provide a list of channels to stream:
streamer = client.open_streamer(["channel1", "channel2"])
Reading Frames
To read the next incoming frame, call the read
method on the streamer:
frame = streamer.read()
This call will block until a new frame is available.
Specifying a Timeout
It’s also possible to add a timeout
parameter to the read
method. If the timeout is
reached before a new frame is available, the method will return None
instead of a frame:
frame = streamer.read(timeout=5) # Wait for 5 seconds
if frame is None:
print("Timed out waiting for a frame")
You can also provide a TimeSpan
object as the timeout
parameter:
import synnax as sy
frame = streamer.read(timeout=5 * sy.TimeSpan.SECOND)
if frame is None:
print("Timed out waiting for a frame")
Downsampling Frames
If you are interested in streaming data at a lower rate than what is being written to the server, you can use the optional downsample
parameter to specify the number of frames to skip before returning the next frame:
# setting the downsample factor to n means every nth sample will be retained
# e.g. for a downsample of 2, if [0.13, 0.12, 0.14, 0.13] is written,
# a streamer.read() will return [0.13, 0.14]
stream = client.open_streamer(["temperature", "pressure"], 2)
Handling Partial Frames
When reading frames from a streamer, it’s important to note that a frame may not
contain data for every channel specified when opening the streamer. For example, if
we’re reading from two sensors, temperature
and pressure
, that are being sampled by
different devices at different rates, we may receive a frame containing data only for
the first channel, followed by a frame containing only data for the second channel.
stream = client.open_streamer(["temperature", "pressure"])
frame = streamer.read()
print(frame[-1]) # Print only the last sample(s) in the frame
# Output: {"temperature": 25.0}
frame = streamer.read()
print(frame[-1])
# Output: {"pressure": 1013.25}
frame = streamer.read()
print(frame[-1])
# Output: {"temperature": 25.1, "pressure": 1013.25}
Using a For Loop
The streamer object is an iterator, so you can use it in a for loop to iterate over incoming frames, blocking on each iteration until a new frame is received:
for frame in streamer:
print(frame)
Updating the Channel List
If you want to update the list of channels being streamed, you can call the
update_channels
method on the streamer:
streamer.update_channels(["channel3", "channel4"])
This method will replace the current list of channels with the new list, not add to it.
Closing the Streamer
After you’re done streaming, it’s essential that you call the close
method on
the streamer to release the network connection and other related resources:
streamer.close()
Using a Context Manager
We recommend using the streamer as a context manager where possible, as this makes it easy to ensure that the streamer is closed correctly:
with client.open_streamer(["channel1", "channel2"]) as streamer:
for frame in streamer:
print(frame)
Using an Async Streamer
If you’re interested in using asyncio
to stream data, you can use the
open_async_streamer
method on the client. This streamer implements an
identical interface to a synchronous streamer, but all methods are asynchronous:
async with await client.open_async_streamer(["channel1", "channel2"]) as streamer:
async for frame in streamer:
print(frame)