Hmmm. This is one of those topics that drives folks crazy. I love it. When it comes to coding and programming there are few things in this world that will make people light torches and rise pitchforks than parallel data processing. The only thing that makes people madder is probably async
code.
Many moons ago I used to have to worry a lot about ProcessPools
and ThreadPools
in Python all day long. For various long-winded and interesting reasons, I was stuck in the grips of a Python-only world, doomed to put TheadPools and MultiProcessing to work on a regular basis to make things run in reasonable amounts of time.
Today I want to dip my toe in the cool, clean, and clear waters of parallel data processing in
Rust
. Yikes. Enter the Rayon crate, which is apparently easy enough for even me to work with. Out of pure curiosity, we will compare it, performance-wise, to ThreadPools and ProcessPools in Python.
I know they are not one-to-one relationships. I’m more interested at a high level what options are for parallel data processing in Rust with Rayon vs Python with ProcessPools and ThreadPools.
Let’s get to the good stuff.
Introduce Rayon in Rust.
Introduce ThreadPools and ProcessPools in Python.
Write some CSV processing code.
Compare Results.
Wait for the insults to come.
Thanks to Delta for sponsoring this newsletter! I personally use Delta Lake on a daily basis, and I believe this technology represents the future of Data Engineering. Check out their website below.
Rayon - parallel data processing with Rust.
So, perchance one day I was scrolling through my interesting Twitter feed, and a little tasty morsel popped up. Rayon. It caught my eye.
“Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel one. It also guarantees data-race freedom.”
The part that got my attention was “easy.” Sold. Why, because it mentioned in the README
that all you have to do to take advantage of Rayon was to turn any .iter() to a .par_iter()
, and the world would be at your feet.
I had recently written a much-maligned post about Rust vs Python in an AWS lambda. It was a great way to learn some more Rust, and I remembered using `iter()` to process the lines of the flat-file/CSV file.
Me thinks to myself, what a perfect little test for Rayon. Mix in a little Python and compare the performance. Now there’s a recipe for making people mad again. Can’t resist, I’m a creature of habit.
Python - TheadPools and ProcessPools.
The devil’s work in Python started with ThreadPools
and ProcessPools
and ended with a strange demon child called async
. Anyways, I digress. Because of the infamous GIL in Python, our most beloved language has always languished far behind pretty much every other language since the beginning of time.
But, there are many instances where TheadPools and ProcessPools in Python can come in very handy when hobbled by the ability to use another language for speed.
Now before you get all up in arms and cry foul, I’m staying that Rust’s Rayon is the same thing as ThreadPools or ProcessPools in Python. What I’m saying is that all of these little wonders are made with the idea to increase data processing speeds and are fairly easy to use.
That puts them in the same fighting ring in my book. Put the gloves on, it’s time to fight. What I’m most curious about is, we have all these tools that are “easy” to use and implement, how easy are they? How does the performance compare?
Our simple data processing example.
So we need a simple data processing “problem” to use for our example and work. What I did was combine about 10 months’ worth of Divvy Bike trip data into a single large CSV file.
How did I combine the files? Spark
of course. It’s good for lots of things. Data munging is one of them.
Spark context available as 'sc' (master = local[*], app id = local-1678228180729).
SparkSession available as 'spark'.
>>> df = spark.read.csv('*divvy-tripdata.csv')
>>> df.repartition(1).write.csv('out')
This turns out to be enough records for our needs probably, 5 million-ish.
>>> df.count() 5,354,607
for our pretend probably let’s just turn this CSV file into a tab-delimited file. Aka, we will have to iterate each row and exchange the commas for tabs. This should suffice.
Rust with Rayon - flat-file processing.
So it’s going to be a pretty simple Rust program, we will write it normally, see the performance, then swap out any iter()
we have for Rayons par_iter()
and see what happens. Sounds simple, doesn’t it?
Here is the first, non-parallel code written in Rust.
The time, 8.14 seconds. (Strange, slower than Python out of the box.) Probably my terrible Rust catching up with me.
I bet you’re as curious as I am if we swap out every instance of an iter()
with par_iter(),
of which the astute reader will have counted exactly 1 of them.
Converting each line from, to tab-delimited, easy spot to parallelize. It must iter()
the records, so we can easily par_iter()
Here is the same code with the new par_iter()
for the comma-to-tab replacement.
And the runtime, 4.86 seconds. Nice! 41% percent performance improvement over the non-Rayon Rust code.
So what does Rust look like with and without Rayon on 5 million+ records? I guess you shouldn’t shake your finger at a 41% increase, especially when doing real-life production workloads.
What do I love most about Rayon + Rust, it’s super easy. I mean you just have to think slightly differently about your code and add some par_ swaps to your code, and that’s it. No worries about memory corruption, races of threads, nothing.
Python ThreadPools and ProcessPools.
You knew it was coming. I can’t wait for all the Python fanatics to come at me with anger and vitriol. But, that’s the spice of life after all. With the Python GIL it’s not that easy to do data parallelization for real. I mean there is always the cursed async nightmare.
But, let’s start by writing the plain old Python script.
Very quick. 6.04 seconds. What’s this devilry, Python is way faster than Rust?! My rust ignorance must be catching up.
TheadPools in Python.
Ok, so now what can we do to try TheadPools out with our Python code? They are actually a little more painful to use, but not out of hand. We have to break up our lines to separate chunks, spin up a ThreadPool, write a function that can do some of our tab conversion work, collect the futures, and write them back out. Ok, maybe it’s a big pain.
Funny, it actually takes longer than vanilla Python. Why? Because our work is not IO bound, it’s CPU bound, and therefore doesn’t benefit from our ThreadPool. Now if we were waiting on some network call.
There’s the performance, 6.17 seconds.
ProcessPools in Python.
Maybe since we are CPU bound with our string work, a ProcessPool will be kinder. The code is almost the same. Yikes! Even slower.
Ouch. 22 seconds.
Dang, those Python ProcessPools are slow!
Now, before you send me angry comments and emails, the whole point of this exercise isn’t really what’s faster than what, whats better performance wise. It’s really about stepping back and doing two things.
Starting to understand parallelization in Rust at a high level.
How easy is it to use Rayon in Rust.
Remind ourselves about Process and Thread Pools in Python.
See how easy or not easy it is to implement all of the above.
I found that surprisingly, for whatever reason, using Rayon in Rust is extremely easy, and using ThreadPools and ProcessPools in Python is somewhat of a pain.
This isn’t really a surprise after all though, when we think about why and how Rust was written, and what Python is good at and used for on a daily basis.
I hope you found something interesting, I caused you to think, yell at my code, and otherwise exercised your curiosity and passion for Data Engineering and programming in general. Till next time.
Cool how you outlined your thinking and your experiment steps!
Would you mind trying this Rust code instead? Although a bit less naive than your first approach, it avoids a lot of the unnecessary allocations existing in your code
fn main() {
let before = Instant::now();
let source_file = File::open("divvy-tripdata.csv").expect("can't open file");
let mut reader = BufReader::new(source_file);
let destination_file = File::create("divvy-biketrip.tsv").expect("problem with file");
let mut writer = BufWriter::new(destination_file);
let mut buffer = String::new();
loop {
buffer.clear();
let read_count = reader.read_line(&mut buffer).expect("something went wrong reading the file");
if read_count == 0 {
break;
}
let replaced = buffer.replace(",", "\t");
writer.write(replaced.as_bytes()).expect("problem writing lines");
}
println!("Elapsed time: {:.2?}", before.elapsed());
}