The plethora of new Rust-based, Python interface data tools flooding the proverbial rolling hills of Data Land is too hard to miss. This new set of tech, like Polars, Datafusion, ruff, and all the rest has solidified Rust’s spot in the Data Engineering space. But, did you ever stop and wonder why?
It really boils down to memory and speed. After all, what are we doing besides pushing and pulling bytes around? Aggregating, transforming, reading, writing. Is Rust really worth it? Is it that much faster and memory-efficient than Python? Why are people building data tools with Rust?
That’s what we are going to look into today. Writing some code in Python and Rust, find out the real reason folks are building Data Tools for Python with Rust.
Thanks to Delta for sponsoring this newsletter! I personally use Delta Lake on a daily basis, and I believe this technology represents the future of Data Engineering. Check out their website below.
When you're working with large data, it can be difficult to hold it all in memory, and potentially expensive to ingest if you rely on the ability to do so … say with Pandas. Such an approach requires larger compute than otherwise, not to mention speed and longer run times.
We want to find the best way to take in data in a single pass with tight resource constraints, as this is likely to be both the fastest and least expensive route to completion.
Don’t get me wrong, I get it. We all have to use Python because it’s the language of choice for data processing in the world we all live in. Machine Learning and AI in this age are still all about Python. But, that doesn’t mean we can’t start chipping around at the edges.
When bottom-line costs like compute bills come into play, it’s hard to argue against the bottom-line improvements Rust-based tooling can give us.
Example problem.
To aid in this exploration, we'll be looking at a synthetic medical claims dataset from Kaggle, for which the schema can be found here. Below in a sample.
Reading through the author's comments, it is the result of merging multiple related datasets from the CMS, and so has a fair amount of opportunity for exploration. Though synthetic, it does offer "representative" data (e.g. identifiers appear more than once, allowing for groupings). A fairly typical dataset.
The test rig used in this article is a Proxmox-based VM with 2GiB RAM, 32GiB of SSD storage, and 4x Celeron J4125 cores, running Debian 11.4.0-amd64.
We won't be particularly rigorous or scientific in our testing and are instead just looking to get some rough idea of relative performance implications for our assorted efforts.
We are trying to show in a simple manner, why Rust-based tools like Polars and Datafusion are probably the future of Data Engineering.
For this illustration, the goals of analysis will be to produce the following:
Claim line counts by county
Claim line counts and total Medicare reimbursement by beneficiary
Timing and Speed (tracking Python and Rust performance)
We will use the \time -v <...>
command to evaluate run duration; due to the low memory allocation for the VM, we will almost certainly be punished for being inefficient with memory, but we won't spend much time troubleshooting this as we ultimately only care about how fast we can make it to the finish line.
The naive approach - what not to do (Python)
To start with, let's first establish the need for efficient processing by showing what not to do. Just to keep things simple, our naive approach will use Python to …
read the dataset into memory
and loop over the records to generate dictionaries of aggregate results
import csv
def main():
claims_file = open("MedicalClaimsSynthetic1M.csv", "r")
claims_data = list(csv.DictReader(claims_file, delimiter=","))
claims_file.close()
claim_line_counts_by_county = {}
claim_line_counts_by_beneficiary = {}
medicare_reimbursement_by_beneficiary = {}
for rec in claims_data:
beneficiary_id = rec["DESYNPUF_ID"]
county_id = rec["BENE_COUNTY_CD"]
medicare_reimbursement = float(rec["MEDREIMB_IP"])
claim_line_counts_by_county[county_id] = claim_line_counts_by_county.get(county_id, 0) + 1
claim_line_counts_by_beneficiary[beneficiary_id] = claim_line_counts_by_beneficiary.get(beneficiary_id, 0) + 1
medicare_reimbursement_by_beneficiary[beneficiary_id] = medicare_reimbursement_by_beneficiary.get(beneficiary_id, 0.0) + medicare_reimbursement
print("count claim_line_counts_by_county: {}".format(len(claim_line_counts_by_county)))
print("count claim_line_counts_by_beneficiary: {}".format(len(claim_line_counts_by_beneficiary)))
print("count medicare_reimbursement_by_beneficiary: {}".format(len(medicare_reimbursement_by_beneficiary)))
main()
And the test results.
# \time -v python3 naive-approach.py
Command terminated by signal 9
Command being timed: "python3 naive-approach.py"
User time (seconds): 15.25
System time (seconds): 26.10
Percent of CPU this job got: 23%
Elapsed (wall clock) time (h:mm:ss or m:ss): 2:55.24
...
Maximum resident set size (kbytes): 1896948
Average resident set size (kbytes): 0
Major (requiring I/O) page faults: 163144
Minor (reclaiming a frame) page faults: 734259
Voluntary context switches: 169238
Involuntary context switches: 41481
...
...Well, uh, that isn't very helpful - we didn't even finish execution, and a quick look at dmesg
tells us that our script ran out of memory. Granted, this was intended to prove a point about materializing large datasets, so let's not pretend that this isn't expected, but take note of how much time was spent getting to this answer: 2:55.24.
Python is the tool that can do it all … to a certain point of course.
Python revisited: streaming read
With that said, we can make a tweak to our program so that we don't materialize the entire list prior to evaluating the results
import csv
def main():
claims_file = open("MedicalClaimsSynthetic1M.csv", "r")
claims_data_reader = csv.DictReader(claims_file, delimiter=",")
claim_line_counts_by_county = {}
claim_line_counts_by_beneficiary = {}
medicare_reimbursement_by_beneficiary = {}
for rec in claims_data_reader:
beneficiary_id = rec["DESYNPUF_ID"]
county_id = rec["BENE_COUNTY_CD"]
medicare_reimbursement = float(rec["MEDREIMB_IP"])
claim_line_counts_by_county[county_id] = claim_line_counts_by_county.get(county_id, 0) + 1
claim_line_counts_by_beneficiary[beneficiary_id] = claim_line_counts_by_beneficiary.get(beneficiary_id, 0) + 1
medicare_reimbursement_by_beneficiary[beneficiary_id] = medicare_reimbursement_by_beneficiary.get(beneficiary_id, 0.0) + medicare_reimbursement
claims_file.close()
print("count claim_line_counts_by_county: {}".format(len(claim_line_counts_by_county)))
print("count claim_line_counts_by_beneficiary: {}".format(len(claim_line_counts_by_beneficiary)))
print("count medicare_reimbursement_by_beneficiary: {}".format(len(medicare_reimbursement_by_beneficiary)))
main()
And the test results.
# \time -v python3 python-approach-revisited.py
count claim_line_counts_by_county: 318
count claim_line_counts_by_beneficiary: 289413
count medicare_reimbursement_by_beneficiary: 289413
Command being timed: "python3 python-approach-revisited.py"
User time (seconds): 15.36
System time (seconds): 0.23
Percent of CPU this job got: 99%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:15.60
...
Maximum resident set size (kbytes): 61988
Average resident set size (kbytes): 0
Major (requiring I/O) page faults: 0
Minor (reclaiming a frame) page faults: 13125
...
Clearly, hanging onto each row of input data for only as long as absolutely necessary is a big deal. Let's remember that going into our Rust approach since the same should hold true there as well.
Naive approach with Rust
So we shouldn’t give up on the materialized list idea yet; Rust's memory overhead should be less than that of Python, and I am convinced we can still pull that full dataset into memory and work with it. Let's try it, just to get a feel for what we're getting into.
First, let's scaffold a new app and pull in the csv and serde crates.
And the code.
use std::collections::HashMap;
use csv::StringRecord;
#[derive(serde::Deserialize)]
struct Claim {
#[serde(rename = "DESYNPUF_ID")]
pub beneficiary_id: String,
#[serde(rename = "BENE_COUNTY_CD")]
pub county_id: String,
#[serde(rename = "MEDREIMB_IP")]
pub medicare_reimbursement: f32,
}
fn main() {
let mut reader = csv::Reader::from_path("../MedicalClaimsSynthetic1M.csv")
.expect("should open file");
// `.flatten()` will filter against `Result::Ok(val)` and give us just the `val`:
let claims: Vec<Claim> = reader.into_deserialize().flatten().collect();
let mut claim_line_counts_by_county: HashMap<String, i32> = HashMap::new();
let mut claim_line_counts_by_beneficiary: HashMap<String, i32> = HashMap::new();
let mut medicare_reimbursement_by_beneficiary: HashMap<String, f32> = HashMap::new();
for item in claims {
*claim_line_counts_by_county.entry(item.county_id.clone()).or_default() += 1;
*claim_line_counts_by_beneficiary.entry(item.beneficiary_id.clone()).or_default() += 1;
*medicare_reimbursement_by_beneficiary.entry(item.beneficiary_id).or_default() += item.medicare_reimbursement;
}
println!("count claim_line_counts_by_county: {}", claim_line_counts_by_county.len());
println!("count claim_line_counts_by_beneficiary: {}", claim_line_counts_by_beneficiary.len());
println!("count medicare_reimbursement_by_beneficiary: {}", medicare_reimbursement_by_beneficiary.len());
}
This one is a bit more effort to write than our Python equivalent, but functions similarly. We did need to define the struct up front, which differs from the string dictionary approach in Python; this comes down to the implementation details of the csv crate, which didn't offer an obvious way to do the same.
This time, when we run (built using cargo build --release
):
# \time -v ./target/release/rusty-csv-app
count claim_line_counts_by_county: 318
count claim_line_counts_by_beneficiary: 289413
count medicare_reimbursement_by_beneficiary: 289413
Command being timed: "./target/release/rusty-csv-app"
User time (seconds): 2.89
System time (seconds): 0.39
Percent of CPU this job got: 99%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:03.29
...
Maximum resident set size (kbytes): 161952
...
... you can see that it did complete successfully without running out of memory. Technically, because our struct doesn't hold all of the input fields from the .CSV file, we're not doing an apples-to-apples comparison in this effort, so let's be fair and flesh that struct out all the way:
#[derive(serde::Deserialize)]
struct Claim {
#[serde(rename = "DESYNPUF_ID")]
pub beneficiary_id: String,
#[serde(rename = "BENE_COUNTY_CD")]
pub county_id: String,
#[serde(rename = "MEDREIMB_IP")]
pub medicare_reimbursement: f32,
BENE_BIRTH_DT: String,
BENE_DEATH_DT: String,
BENE_SEX_IDENT_CD: String,
BENE_RACE_CD: String,
BENE_ESRD_IND: String,
SP_STATE_CODE: String,
BENE_HI_CVRAGE_TOT_MONS: String,
BENE_SMI_CVRAGE_TOT_MONS: String,
BENE_HMO_CVRAGE_TOT_MONS: String,
PLAN_CVRG_MOS_NUM: String,
SP_ALZHDMTA: String,
SP_CHF: String,
SP_CHRNKIDN: String,
SP_CNCR: String,
SP_COPD: String,
SP_DEPRESSN: String,
SP_DIABETES: String,
SP_ISCHMCHT: String,
SP_OSTEOPRS: String,
SP_RA_OA: String,
SP_STRKETIA: String,
BENRES_IP: String,
PPPYMT_IP: String,
MEDREIMB_OP: String,
BENRES_OP: String,
PPPYMT_OP: String,
MEDREIMB_CAR: String,
BENRES_CAR: String,
PPPYMT_CAR: String,
CLM_ID: String,
CLM_FROM_DT: String,
CLM_THRU_DT: String,
ICD9_DGNS_CD_1: String,
PRF_PHYSN_NPI_1: String,
HCPCS_CD_1: String,
LINE_NCH_PMT_AMT_1: String,
LINE_BENE_PTB_DDCTBL_AMT_1: String,
LINE_COINSRNC_AMT_1: String,
LINE_PRCSG_IND_CD_1: String,
LINE_ICD9_DGNS_CD_1: String,
}
Yikes! No wonder the Python program crashed on us; look at how many fields we're holding onto. Let's see if we can elicit the same out-of-memory response from our Rust friend:
# \time -v ./target/release/rusty-csv-app
count claim_line_counts_by_county: 318
count claim_line_counts_by_beneficiary: 289413
count medicare_reimbursement_by_beneficiary: 289413
Command being timed: "./target/release/rusty-csv-app"
User time (seconds): 13.75
System time (seconds): 12.85
Percent of CPU this job got: 40%
Elapsed (wall clock) time (h:mm:ss or m:ss): 1:05.27
...
Maximum resident set size (kbytes): 1884004
Average resident set size (kbytes): 0
Major (requiring I/O) page faults: 85838
Minor (reclaiming a frame) page faults: 1366523
...
Well, so much for that theory. As I originally predicted, we were able to fit that payload into memory and do everything we needed to with our fully materialized dataset. So, just to recap everything we know so far:
So far, Rust has Python beat on speed by condensing the width of information stored in memory, but loses out on working memory. What happens if we give Rust the streaming treatment?
Streaming reads with Rust
If you look at the examples shown on the csv crate's readme, you'll note that they don't materialize data like we did, but instead stream the read-through iteration over the reader. Let's do likewise, and see what happens - to start, we'll keep our wide struct definition from the prior run, just to see what kind of difference our read methodology makes to the worst-case scenario in Rust:
fn main() {
let mut reader = csv::Reader::from_path("../MedicalClaimsSynthetic1M.csv")
.expect("should open file");
let mut claim_line_counts_by_county: HashMap<String, i32> = HashMap::new();
let mut claim_line_counts_by_beneficiary: HashMap<String, i32> = HashMap::new();
let mut medicare_reimbursement_by_beneficiary: HashMap<String, f32> = HashMap::new();
for item in reader.deserialize::<Claim>() {
if let Ok(claim) = item {
*claim_line_counts_by_county.entry(claim.county_id.clone()).or_default() += 1;
*claim_line_counts_by_beneficiary.entry(claim.beneficiary_id.clone()).or_default() += 1;
*medicare_reimbursement_by_beneficiary.entry(claim.beneficiary_id).or_default() += claim.medicare_reimbursement;
}
}
println!("count claim_line_counts_by_county: {}", claim_line_counts_by_county.len());
println!("count claim_line_counts_by_beneficiary: {}", claim_line_counts_by_beneficiary.len());
println!("count medicare_reimbursement_by_beneficiary: {}", medicare_reimbursement_by_beneficiary.len());
}
And the results.
# \time -v ./target/release/rusty-csv-app
count claim_line_counts_by_county: 318
count claim_line_counts_by_beneficiary: 289413
count medicare_reimbursement_by_beneficiary: 289413
Command being timed: "./target/release/rusty-csv-app"
User time (seconds): 5.59
System time (seconds): 0.39
Percent of CPU this job got: 99%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:05.99
Average shared text size (kbytes): 0
Average unshared data size (kbytes): 0
Average stack size (kbytes): 0
Average total size (kbytes): 0
Maximum resident set size (kbytes): 58372
Average resident set size (kbytes): 0
Major (requiring I/O) page faults: 0
Minor (reclaiming a frame) page faults: 10239
Voluntary context switches: 5
Involuntary context switches: 9
Swaps: 0
File system inputs: 388088
File system outputs: 0
Socket messages sent: 0
Socket messages received: 0
Signals delivered: 0
Page size (bytes): 4096
Exit status: 0
By changing from materialized data to streaming reads, we bring the time down from 1:05.27 to a much nicer 0:05.99.
Okay, you know where this is headed, right? Let's boost our performance by stripping out the fields we're not using in our struct, going back to the original example, and see what the impact is:
# \time -v ./target/release/rusty-csv-app
count claim_line_counts_by_county: 318
count claim_line_counts_by_beneficiary: 289413
count medicare_reimbursement_by_beneficiary: 289413
Command being timed: "./target/release/rusty-csv-app"
User time (seconds): 3.03
System time (seconds): 0.22
Percent of CPU this job got: 99%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:03.26
...
Maximum resident set size (kbytes): 58440
Average resident set size (kbytes): 0
Major (requiring I/O) page faults: 0
Minor (reclaiming a frame) page faults: 9725
...
Let's revisit our comparison of our scenarios with this latest batch of results:
Well, that's... not as exciting as I'd hoped. Yes, we've achieved the best results by combining our methods, but the only real gain in switching to a streaming read in Rust is that we use less memory; the duration between that and the materialized approach is far too close to call.
Wrapping up
I think this basic overview of simple data processing shows why Rust-based tools like Polars and Datafusion are probably the future of Data Engineering and taking more root.
Performance in terms of memory and speed can save real money to the bottom line when processing data over the long term.
On a lark, just to see if there is in fact a way to do this faster, I crafted a similar workload in DataFusion CLI to run the same data through its paces, and came up with the following result:
# datafusion-cli
DataFusion CLI v32.0.0
❯ WITH
TerseClaims AS (
SELECT
"DESYNPUF_ID" AS beneficiary_id,
"BENE_COUNTY_CD" AS county_id,
"MEDREIMB_IP" AS medicare_reimbursement
FROM '../MedicalClaimsSynthetic1M.csv'
),
ByBeneficiary AS (
SELECT
beneficiary_id,
SUM(medicare_reimbursement) AS total_medicare_reimbursement
FROM TerseClaims
GROUP BY beneficiary_id
),
ByCounty AS (
SELECT
county_id,
COUNT(*) AS claim_line_count
FROM TerseClaims
GROUP BY county_id
)
SELECT COUNT(*) AS "count ByCounty" FROM ByCounty
UNION
SELECT COUNT(*) AS "count ByBeneficiary" FROM ByBeneficiary;
+----------------+
| count ByCounty |
+----------------+
| 289413 |
| 318 |
+----------------+
2 rows in set. Query took 1.453 seconds.
Clearly, we're leaving something on the table from a performance standpoint with our Rust implementation. DataFusion is definitely doing a lot more than we are to optimize performance, but with less than a two-second lead on us I'd say we're at a point of diminishing returns for our trivial problem.
What do you think? Is there an obvious next step to try on either the Rust or Python side to eke out more gains?
Not an expert here, But isn't Spark already good for this kind of Data processing?