We’re excited to announce the release of the Bytewax InfluxDB Connector, which allows easy integration between Bytewax and InfluxDB, a leading time-series database.
Whether you’re working with IoT data, telemetry systems, market data or other time-series metrics, this connector streamlines the process of pulling real-time data from InfluxDB and processing it in Bytewax for insights like downsampling and aggregation as well as write data streams to influxdb.
To skip to the code, check out the examples in the connector repository.
This connector enables you to:
- Write time-series data to InfluxDB using Bytewax's InfluxDBSink.
- Pull data directly from InfluxDB using Bytewax's InfluxDBSource.
- Perform real-time transformations in Bytewax, such as downsampling, aggregations, anomaly detection, and predictive analytics.
It's dead easy to use!
Installation
pip install bytewax-influxdb
Writing Data
First, you will need your influxdb details and your API Key to get started.
Next, to write data to influxDB, it's as simple as passing in a list of items from your dataflow in a sink. Below is an example of how you can write in "lines" format, but you could also pass dictionaries, pandas Dataframes
, Influx Points
, Polars Dataframes
and more.
import os
import logging
from datetime import timedelta
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
from bytewax.influxdb import InfluxDBSink
TOKEN = os.getenv(
"INLFUXDB_TOKEN",
"my-token",
)
DATABASE = os.getenv("INFLUXDB_DATABASE", "testing")
ORG = os.getenv("INFLUXDB_ORG", "dev")
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
lines = [
"home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1724258000",
"home,room=Kitchen temp=21.0,hum=35.9,co=0i 1724258000",
"home,room=Living\ Room temp=21.4,hum=35.9,co=0i 1724259000",
"home,room=Kitchen temp=23.0,hum=36.2,co=0i 1724259000",
"home,room=Living\ Room temp=21.8,hum=36.0,co=0i 1724260000",
"home,room=Kitchen temp=22.7,hum=36.1,co=0i 1724260000",
"home,room=Living\ Room temp=22.2,hum=36.0,co=0i 1724261000",
"home,room=Kitchen temp=22.4,hum=36.0,co=0i 1724261000",
"home,room=Living\ Room temp=22.2,hum=35.9,co=0i 1724262000",
"home,room=Kitchen temp=22.5,hum=36.0,co=0i 1724262000",
"home,room=Living\ Room temp=22.4,hum=36.0,co=0i 1724263000",
"home,room=Kitchen temp=22.8,hum=36.5,co=1i 1724263000",
"home,room=Living\ Room temp=22.3,hum=36.1,co=0i 1724264000",
"home,room=Kitchen temp=22.8,hum=36.3,co=1i 1724264000",
"home,room=Living\ Room temp=22.3,hum=36.1,co=1i 1724265000",
"home,room=Kitchen temp=22.7,hum=36.2,co=3i 1724265000",
"home,room=Living\ Room temp=22.4,hum=36.0,co=4i 1724266000",
"home,room=Kitchen temp=22.4,hum=36.0,co=7i 1724266000",
"home,room=Living\ Room temp=22.6,hum=35.9,co=5i 1724267000",
"home,room=Kitchen temp=22.7,hum=36.0,co=9i 1724267000",
"home,room=Living\ Room temp=22.8,hum=36.2,co=9i 1724268000",
"home,room=Kitchen temp=23.3,hum=36.9,co=18i 1724268000",
"home,room=Living\ Room temp=22.5,hum=36.3,co=14i 1724269000",
"home,room=Kitchen temp=23.1,hum=36.6,co=22i 1724269000",
"home,room=Living\ Room temp=22.2,hum=36.4,co=17i 1724270000",
"home,room=Kitchen temp=22.7,hum=36.5,co=26i 1724270000",
]
flow = Dataflow("simple_output")
stream = op.input("input", flow, TestingSource(lines))
keyed_stream = op.key_on("key_location", stream, lambda x: x.split(",")[0])
op.inspect("check_stream", stream)
batch_readings = op.collect(
"lines", keyed_stream, max_size=10, timeout=timedelta(milliseconds=50)
)
op.output(
"out",
batch_readings,
InfluxDBSink(
host="https://us-east-1-1.aws.cloud2.influxdata.com",
database=DATABASE,
org=ORG,
token=TOKEN,
write_precision="s",
),
)
Reading Data
Reading from InfluxDB is simple as well!
Below is an example that will pull the most recent data from InfluxDB every 15 seconds from the home table/bucket:
import os
import logging
from datetime import timedelta, datetime, timezone
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.influxdb import InfluxDBSource
TOKEN = os.getenv(
"INFLUXDB_TOKEN",
"my-token",
)
DATABASE = os.getenv("INFLUXDB_DATABASE", "testing")
ORG = os.getenv("INFLUXDB_ORG", "dev")
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
flow = Dataflow("a_simple_example")
inp = op.input(
"inp",
flow,
InfluxDBSource(
timedelta(seconds=15),
"https://us-east-1-1.aws.cloud2.influxdata.com",
DATABASE,
TOKEN,
"home",
ORG,
tz=timezone.utc),
),
)
op.inspect("input", inp)
> INFO:bytewax.influxdb:last_time:2024-08-21 17:03:20+00:00
> a_simple_example.input: pyarrow.RecordBatch
co: int64
hum: double
room: string
temp: double
time: timestamp[ns] not null
----
co: [0,0,0,0]
hum: [35.9,36.2,35.9,35.9]
room: ["Kitchen","Kitchen","Living Room","Living Room"]
temp: [21,23,21.1,21.4]
time: [2024-08-21 16:33:20.000000000,2024-08-21 16:50:00.000000000,2024-08-21 16:33:20.000000000,2024-08-21 16:50:00.000000000]
Purchasing the Connector
The Bytewax InfluxDB Connector is available via subscription at modules.bytewax.io. This allows you to unlock advanced features and ongoing support.
How it Works
The Bytewax InfluxDB Connector consists of two main components: the Source and the Sink, each responsible for interfacing with InfluxDB in distinct ways. You can see the source code in the GitHub Repository. We will discuss how they work in-depth below.
The InfluxDB Source
The InfluxDBSource class is designed to read data from an InfluxDB instance by partitioning the data retrieval process into fixed intervals. When initializing the Source, you can define:
- Interval: The time between polls to fetch new data.
- Host: InfluxDB's server address.
- Database: The target InfluxDB database.
- Measurement: The specific time-series data being queried.
- Start time: Defines when the data query should start from.
It is limited to one partition for the time being due to constraints with the Python client not being able to partition the input data, but you could effectively distribute data across multiple workers down stream if you had resource intensive processing with the bytewax redistribute
operator.
The connector’s Source handles querying InfluxDB at predefined intervals and transforms the data into Arrow RecordBatches, which are efficient for in-memory processing in Python. Bytewax pipelines can then process these RecordBatches as they would any other data. For each partition, the source queries data between last_time
and now
, resuming seamlessly after any interruptions via the snapshot()
method, which tracks the last query time.
The typical output of this source is Arrow RecordBatches, which can be converted into formats like pandas DataFrames for further processing.
The InfluxDB Sink
The InfluxDBSink is the counterpart to the Source. This class writes processed data from a Bytewax dataflow back to an InfluxDB instance. It supports multiple workers, making it highly scalable. Key parameters for initialization include:
- Host: The InfluxDB server address.
- Database: The target database for writing data.
- Write precision: Specifies the precision (e.g., nanoseconds, microseconds) for storing time-series data points.
- Token and Org: For authentication and identifying the InfluxDB organization.
The Sink writes a batch of items, which can be in various formats such as strings, dictionaries, or InfluxDB Point objects, to InfluxDB. It allows for writing real-time, processed data back into InfluxDB using the native write capabilities of the InfluxDB client.
This setup enables Bytewax users to create dataflows that not only ingest time-series data but also write it back to InfluxDB for further analysis, visualization, or storage. The connector's flexibility in terms of input and output types (Arrow RecordBatches in the Source and various formats in the Sink) makes it a powerful tool for building dynamic, real-time data pipelines.
With the Bytewax InfluxDB Connector, you can now easily manage your time-series data, creating end-to-end data pipelines that fit seamlessly into your existing Bytewax projects. The connector can be purchased via subscription on modules.bytewax.io, enabling you to unlock its full potential.
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.