Goal
I want to write a function in Rust using the Polars crate that does the following:
- Copy a Parquet file from one location to another
- Handle files larger than RAM
- Not load the entire file into memory
- Be as fast as possible
Observations
My benchmark results show:
| Operation | Memory Usage | Time |
|---|---|---|
| Parquet → Parquet | 56 GB | 118 seconds |
| Parquet → CSV | 28 GB | 75 seconds |
| Parquet → DataFrame | 25 GB | 10 seconds |
Note: Compression settings (compressed vs. uncompressed) make no difference in memory usage or time.
Questions
What makes me wonder:
- Why does the streaming API use so much memory? I expected it to use a constant, small amount of memory regardless of input file size.
- Why is RAM consumption for writing to Parquet (56GB) so much higher than writing to CSV (28GB)?
I thought the streaming API of Polars would be very efficient regarding memory usage, but it turned out that this is not the case (or I am doing something fundamentally wrong).
Test Environment
- Input file: Parquet file with 700,000,000 rows and 5 columns
- File sizes: Parquet ~2GB, CSV ~9GB
- Schema: 4 integer columns + 1 float column
- Hardware: MacBook Pro with 16 GB RAM, macOS Tahoe
- Rust/Cargo: Version 1.90.0
- Polars: Version 0.51.0
Sample Data
The CSV data looks like this:
a,b,c,d,e
0,0,2,3,20.0
1,1,1,4,22.4
1,0,5,1,15.1
1,0,4,0,31.1
Dependencies
Cargo.toml:
polars = { version = "0.51.0", features = ["lazy", "temporal", "parquet"] }
Code
Test Functions
1. Streaming Copy: Parquet → Parquet
fn streaming_from_parquet_to_parquet_copy(input_path: &str, output_path: &str) -> PolarsResult<()> {
let args = ScanArgsParquet {
low_memory: true,
..Default::default()
};
LazyFrame::scan_parquet(PlPath::new(input_path), args)?
.with_new_streaming(true)
.sink_parquet(
SinkTarget::Path(PlPath::new(output_path)),
get_parquet_write_options(),
None,
Default::default()
)?
.collect()?;
Ok(())
}
2. Streaming Copy: Parquet → CSV
fn streaming_from_parquet_to_csv_copy(input_path: &str, output_path: &str) -> PolarsResult<()> {
let args = ScanArgsParquet {
low_memory: true,
..Default::default()
};
LazyFrame::scan_parquet(PlPath::new(input_path), args)?
.with_new_streaming(true)
.sink_csv(
SinkTarget::Path(PlPath::new(output_path)),
CsvWriterOptions::default(),
None,
Default::default()
)?
.collect()?;
Ok(())
}
3. Streaming Copy: Parquet → DataFrame
fn streaming_from_parquet_to_df_copy(input_path: &str) -> PolarsResult<DataFrame> {
let args = ScanArgsParquet {
low_memory: true,
..Default::default()
};
LazyFrame::scan_parquet(PlPath::new(input_path), args)?
.with_new_streaming(true)
.collect()
}
Helper Function: Parquet Write Options
/// Helper function to get ParquetWriteOptions with our configured compression
const PARQUET_COMPRESSION: ParquetCompression = ParquetCompression::Uncompressed; // or Snappy
fn get_parquet_write_options() -> ParquetWriteOptions {
ParquetWriteOptions {
compression: PARQUET_COMPRESSION,
row_group_size: Some(500_000), // Flush to disk every 500k rows
..Default::default()
}
}
Test Data Generation
This is the code used to create the test CSV and Parquet files:
fn test_generate_large_files() -> PolarsResult<()> {
use std::io::BufWriter;
use std::io::Write;
let mut rng = rand::rng();
let file = fs::File::create(TEST_CSV_PATH).expect("Failed to create test CSV file");
let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
// Write header
writeln!(writer, "a,b,c,d,e").expect("Failed to write header");
let num_rows = 700_000_000;
const CHUNK_SIZE: usize = 100_000;
let mut buffer = String::with_capacity(CHUNK_SIZE * 20);
for chunk_start in (0..num_rows).step_by(CHUNK_SIZE) {
buffer.clear();
let chunk_end = (chunk_start + CHUNK_SIZE).min(num_rows);
for _i in chunk_start..chunk_end {
let a = rng.random_range(0..=1);
let b = rng.random_range(0..=1);
let c = rng.random_range(0..=5);
let d = rng.random_range(0..=4);
let e = rng.random_range(10.0..35.0);
use std::fmt::Write as FmtWrite;
writeln!(&mut buffer, "{},{},{},{},{:.1}", a, b, c, d, e)
.expect("Failed to write to buffer");
}
// Write entire chunk at once
writer.write_all(buffer.as_bytes()).expect("Failed to write chunk");
}
writer.flush().expect("Failed to flush writer");
drop(writer); // Ensure file is closed
let metadata = fs::metadata(TEST_CSV_PATH).expect("Failed to get file metadata");
let size_mb = metadata.len() as f64 / (1024.0 * 1024.0);
// Now convert CSV to Parquet for comparison tests
let df = LazyCsvReader::new(PlPath::new(TEST_CSV_PATH)).finish()?.collect()?;
let mut parquet_file = std::fs::File::create(TEST_PARQUET_PATH)?;
ParquetWriter::new(&mut parquet_file)
.with_compression(PARQUET_COMPRESSION)
.finish(&mut df.clone())?;
Ok(())
}
What am I doing wrong?
Any insights into why the streaming API is using so much memory would be greatly appreciated!