In this blog post, we will use Bytewax to do word count, stream processing's canonical hello world.
We'll take the following steps:
- Set up Bytewax locally
- Read data into a dataflow and process it
- Run the dataflow
Want to skip and go straight to the code? repo
What is Bytewax?
Bytewax is a dataflow stream processing framework that allows you to process data in real-time with Python. Bytewax is open source and is built on top of the Timely Dataflow execution framework.
Let’s break that down.
A dataflow is a set of operators that act on data and can be described in a directed graph. A dataflow is highly paralellizable in nature because the operators act independently on the data flowing through the system. This allows for awesome throughput.
Operators are the processing primitives of bytewax. Each of them gives you a "shape" of data transformation, and you give them functions to customize them to a specific task you need. The combination of each operator and their custom logic functions we call a dataflow Step.
Set Up Bytewax Locally
Bytewax is a python library that can be installed with the common package manager pip
and currently requires python 3.7 or greater.
Before getting started, create a fresh environment with the environment manager of your preference. Once complete, install bytewax.
> pip install bytewax
That was easy, wasn’t it? You now have a powerful stream processing tool at your hands. No messing around with the JVM, new package managers, or configuration files. You can prototype and ship in the same language and with the same tool!
Let’s get coding!
for more in-depth getting started and hello world explanations, see the Bytewax docs.
Read Data into a Dataflow and Process It
Now that we've installed Bytewax, let's begin with an end-to-end example. We'll start by building out a simple dataflow that performs a count of words in a file.
To begin, save a copy of this text in a file called wordcount.txt
:
To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them.
A dataflow is a series of processing steps. For our wordcount dataflow, we'll want the following steps:
- Iterate Over a File Object as Input
- Lowercase all characters in the line
- Split the line into words
- Count the occurrence of each word in the file
- Print out the result after all the lines have been processed
- Build and Run
Visually our dataflow could be represented in a directed graph like the one shown below, with each step being represented by a shape. In the next section we will talk about the operators that are shown in the shapes, and what they do.
We'll start with how to get input into our dataflow. This should be an iterable object, like a kafka consumer or in this case, a file object.
Iterating Over a File Object as Input
(Take a Line From the File)
In this instance, we are not receiving input from a stream, so we will use a single_batch
input helper which will result in the whole file being processed as a single epoch. Epochs are what allow us to influence ordering and batching within a dataflow.
from bytewax import inp
ec = bytewax.Executor()
flow = ec.Dataflow(inp.single_batch(open("wordcount.txt")))
In line 4, we define our DataFlow
, which will receive data from the input we designated.
Great! We have input. Let's define the steps that we want for each line of input that we receive.
Lowercase All Characters in the Line
If you look closely at our input, we have instances of both To
and to
. Let's add a step to our dataflow that transforms each line into lowercase letters so we are counting unique words and not unique casing. At the same time, we'll introduce our first operator, map
.
flow.map(str.lower)
For each item that our generator produces, we'll use the built-in method lower()
to return a copy of the string with all characters converted to lowercase.
Split the Line into Words
Our dataflow will receive an entire line from our file. In order to count the words in the file, we'll need to break that line up into individual words and start an initial count, this will allow us to use the Pythons built-in operator.add
in our reduce step.
Enter our tokenize
function:
def tokenize(x):
return [(word, 1) for word in re.findall(r'[^\s!,.?":;0-9]+', x)]
Here, we use a Python regular expression to split the line of input into a list of words:
> line = "to be, or not to be, that is the question:"
> tokenize(line)
> [('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1), ('that', 1), ('is', 1), ('the', 1), ('question', 1)]
To make use of the tokenize
function, we'll use the flat_map
operator:
flow.flat_map(tokenize)
The flat_map
operator defines a step that calls a function on each input item, but instead of returning a single item for each input like map
, it can return zero to many items. Each word in the list we return from our function will then be processed downstream.
Count the Occurrence of Each Word in the File
Next, we will use the reduce_epoch
operator to count the number of times the word exists in the file:
flow.reduce_epoch(operator.add)
The reduce_epoch
operator takes a function/method as the argument. We are using the Python built-in operator.add
, but this could be our own defined function. The function in the reduce_epoch
step is receiving a tuple with the format (key, value) and it will aggregate all of the of the values with that key over an epoch. Remember in our prior map
step we returned a tuple of shape ("word", 1)
. Now in this reduce step we are going to add all of the values for each key. The output to the next step will be the shape (key
, value
) or in this specific example it will be ("word"
, count
). For more information on the reduce set of operators, checkout the documentation
Print Out the Result After All the Lines Have Been Processed
The last part of our dataflow program will use the inspect
operator to see the results of our reduction. For this example, we're supplying the built-in Python function print
. The inspect operator provides us with the ability to access the items in the stream, but not modify them for the next operator in the dataflow.
Here is the complete output when running the example:
('to', 4)
('be', 2)
('or', 2)
('not', 1)
...
('them', 1)
To run the example, we'll need to introduce one more function:
Build and Run
At the end of our example, we have the following code:
if __name__ == "__main__":
exec.build_and_run()
When we call build_and_run
, our dataflow program will begin running and will exit when there are no more lines in the input file.
Putting it all together!
import operator
import re
import bytewax
from bytewax import inp
def tokenize(x):
return [(word, 1) for word in re.findall(r'[^\s!,.?":;0-9]+', x)]
ec = bytewax.Executor()
flow = ec.Dataflow(inp.single_batch(open("wordcount.txt")))
# "Here we have full sentences"
flow.map(str.lower)
# "lowercase sentences"
flow.flat_map(tokenize)
# ("word, 1")
flow.reduce_epoch(operator.add)
# ("word", count)
flow.inspect(print)
if __name__ == "__main__":
ec.build_and_run()
Run the Dataflow
Now to run our wordcount.py
file:
> python ./wordcount.py
('to', 4)
('be', 2)
('or', 2)
('not', 1)
...
('them', 1)
✨ Congratulations!
You just ran your first dataflow using Bytewax!
For more examples like this one, check out the bytewax repo examples!
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.