Introducing the DuckDB and MotherDuck sink operator

By Laura Funderburk

Efficient data processing requires tools that can seamlessly balance performance, usability, and scalability. The introduction of Bytewax’s integration with DuckDB and MotherDuck marks a significant leap, making it even more powerful for batch and stream processing enthusiasts. This blog post introduces the new Bytewax DuckDB and MotherDuck operator integration, walking you through its capabilities, and showcasing how it seamlessly bridges stream processing with high-performance analytical database systems.

Where Streams Meet Quacks

DuckDB is an in-process SQL OLAP database known for its blazing-fast analytical processing and ease of integration. It is lightweight, yet powerful enough to handle complex analytical queries efficiently.

MotherDuck builds on DuckDB’s foundation, providing cloud-based data warehouse for collaborative and scalable processing. It’s ideal for scenarios where DuckDB's capabilities need to expand into a shared or cloud-native environment. By acting as a bridge between lightweight local analytics and scalable cloud solutions, MotherDuck offers the best of both worlds:

  • Local-First Efficiency: Developers can start with DuckDB for in-memory processing and analytics during the initial stages of a project or for localized tasks.
  • Scalable Collaboration: As data grows or the need for team collaboration arises, MotherDuck allows seamless migration to a cloud environment. This ensures continuity without needing to re-architect workflows.

With the introduction of a dedicated Bytewax sink for DuckDB and MotherDuck, you can now:

  • Process streams with Bytewax operators.
  • Store results in DuckDB/MotherDuck for further analysis.
  • Leverage the full potential of DuckDB’s analytical prowess alongside Bytewax's real-time processing.

This integration introduces a practical and flexible way to handle the challenges of real-time data processing and analysis. It is particularly valuable for use cases that demand efficient stream processing combined with analytical capabilities, enabling seamless transitions between data ingestion, transformation, and querying.

Practical Applications

Example use cases include:

  • Operational Dashboards: Stream live metrics into DuckDB for immediate insights, with MotherDuck scaling the solution for organizational access.
  • Real-Time ETL Pipelines: Ingest, transform, and store streaming data from Bytewax into DuckDB or MotherDuck for querying or further processing.
  • Hybrid Workflows: Combine on-premises processing with cloud-based scalability, ensuring cost efficiency while maintaining flexibility.

This integration is especially useful in scenarios where data needs to be immediately available for analytical queries or where data overload is significant, requiring streaming or micro batching.

Installation

We've made installation as easy as Py by making it pip-installable:

pip install bytewax-duckdb

This will also install the latest DuckDB and Bytewax modules.

You can try out this Colab Gist we created

Storing data to DuckDB in batches through a Bytewax dataflow

When working with this integration in Bytewax, you can use it to process data in batch and write data to a target database or file in a structured way. One essential requirement to be aware of is that the sink expects data in the following tuple format:

("key", List[Dict])

Where

"key": The first element is a string identifier for the batch. Think of this as a “batch ID” that helps to organize and keep track of which group of entries belong together. Every batch you send must have a unique key or identifier.

List[Dict]: The second element is a list of dictionaries. Each dictionary represents an individual data record, with each key-value pair in the dictionary representing fields and their corresponding values.

Together, the tuple tells the sink: “Here is a batch of data, labeled with a specific key, and this batch contains multiple data entries.”

This format is designed to let the sink write data efficiently in batches, rather than handling each entry one-by-one. By grouping data entries together with an identifier, the sink can:

  • Optimize Writing: Batching data reduces the frequency of writes to the database or file, which can dramatically improve performance, especially when processing high volumes of data.

  • Ensure Atomicity: By writing a batch as a single unit, we minimize the risk of partial writes, ensuring either the whole batch is written or none at all. This is especially important for maintaining data integrity.

Here is an example for a local DuckDB file.

import bytewax.duckdb.operators as duck_op
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource, run_main

flow = Dataflow("duckdb")


def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
    return ("1", {"id": value, "name": "Alice"})


inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)

duck_op.output(
    "out",
    dict_stream,
    "sample.duckdb",
    "example_table",
    "CREATE TABLE IF NOT EXISTS example_table (id INTEGER, name TEXT)",
)

run_main(flow)

Executing this will create a file named sample.duckdb with a table named example_table containing two columns "id" and "name". The table will be populated by 50 entries containing an id ranging from 0 to 49 and the word Alice as the name.

We can verify it was correctly populated as follows:

import duckdb

# Inspect user's data
con = duckdb.connect("sample.duckdb")
result = con.execute("SELECT * FROM example_table").fetchall()
con.sql("from example_table")
|  id   │  name   │
│ int32 │ varchar │
├───────┼─────────┤
│     0 │ Alice   │
│     1 │ Alice   │
│     2 │ Alice   │
│     3 │ Alice   │
│     4 │ Alice   │
│     5 │ Alice   │
│     6 │ Alice   │
│     7 │ Alice   │
│     8 │ Alice   │
│     9 │ Alice   │
│     · │   ·     │
│     · │   ·     │
│     · │   ·     │
│    40 │ Alice   │
│    41 │ Alice   │
│    42 │ Alice   │
│    43 │ Alice   │
│    44 │ Alice   │
│    45 │ Alice   │
│    46 │ Alice   │
│    47 │ Alice   │
│    48 │ Alice   │
│    49 │ Alice   │
├───────┴─────────┤
│     50 rows     │
│   (20 shown)    │
└─────────────────

Let's now populate a MotherDuck instance.

Writing a batch of data to MotherDuck

Important To connect to a MotherDuck instance, ensure to create an account and generate a token. You can store this token into your environment variables.

import os
import random
from typing import Dict, Tuple, Union

# Save the token in an environment variable
md_token = os.getenv("MOTHERDUCK_TOKEN")

# Initialize the dataflow
flow = Dataflow("duckdb-names-cities")

# Define sample data for names and locations
names = ["Alice", "Bob", "Charlie", "Diana", "Eve"]
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]

# Function to create a dictionary with more varied data
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
    name = random.choice(names)
    age = random.randint(20, 60)  # Random age between 20 and 60
    location = random.choice(locations)
    return ("batch_1", {"id": value, "name": name, "age": age, "location": location})

# Generate input data
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)
db_path =f"md:my_db?motherduck_token={md_token}"
# Output the data to MotherDuck, creating a table with multiple columns
duck_op.output(
    "out",
    dict_stream,
    db_path,
    "names_cities",
    "CREATE TABLE IF NOT EXISTS names_cities (id INTEGER, name TEXT, age INTEGER, location TEXT)"
)

# Run the dataflow
run_main(flow)

Let's verify database population.

con_md = duckdb.connect(db_path)

result = con_md.execute("SELECT * FROM names_cities").fetchall()
con_md.sql("from names_cities")
|  id   │  name   │  age  │  location   │
│ int32 │ varchar │ int32 │   varchar   │
├───────┼─────────┼───────┼─────────────┤
│     0 │ Eve     │    43 │ Houston     │
│     1 │ Charlie │    32 │ New York    │
│     2 │ Alice   │    55 │ Houston     │
│     3 │ Charlie │    30 │ Chicago     │
│     4 │ Eve     │    29 │ Houston     │
│     5 │ Alice   │    49 │ Phoenix     │
│     6 │ Diana   │    42 │ Chicago     │
│     7 │ Alice   │    26 │ Los Angeles │
│     8 │ Alice   │    54 │ Chicago     │
│     9 │ Bob     │    34 │ Phoenix     │
│     · │  ·      │     · │    ·        │
│     · │  ·      │     · │    ·        │
│     · │  ·      │     · │    ·        │
│    40 │ Charlie │    48 │ Phoenix     │
│    41 │ Bob     │    28 │ Phoenix     │
│    42 │ Diana   │    45 │ Los Angeles │
│    43 │ Bob     │    48 │ Chicago     │
│    44 │ Bob     │    50 │ Chicago     │
│    45 │ Bob     │    52 │ Los Angeles │
│    46 │ Diana   │    37 │ Los Angeles │
│    47 │ Diana   │    31 │ Chicago     │
│    48 │ Charlie │    35 │ Houston     │
│    49 │ Eve     │    31 │ Houston     │
├───────┴─────────┴───────┴─────────────┤
│ 150 rows (20 shown)         4 columns │
└───────────────────────────────────────

Summary

The Bytewax integration with DuckDB and MotherDuck aligns stream processing with advanced analytics, offering a unified platform to handle data from ingestion to insight. By bridging local and cloud environments, it addresses both small-scale efficiency and large-scale collaboration needs. This makes it an indispensable tool for organizations seeking a streamlined and scalable approach to modern data workflows.

Try it out today, and redefine your stream processing pipelines with the power of Bytewax, DuckDB, and MotherDuck!

Happy Streaming!

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post

Laura Funderburk

Senior Developer Advocate
Laura Funderburk holds a B.Sc. in Mathematics from Simon Fraser University and has extensive work experience as a data scientist. She is passionate about leveraging open source for MLOps and DataOps and is dedicated to outreach and education.
Next post