1

Goal

I want to write a function in Rust using the Polars crate that does the following:

  • Copy a Parquet file from one location to another
  • Handle files larger than RAM
  • Not load the entire file into memory
  • Be as fast as possible

Observations

My benchmark results show:

Operation Memory Usage Time
Parquet → Parquet 56 GB 118 seconds
Parquet → CSV 28 GB 75 seconds
Parquet → DataFrame 25 GB 10 seconds

Note: Compression settings (compressed vs. uncompressed) make no difference in memory usage or time.

Questions

What makes me wonder:

  1. Why does the streaming API use so much memory? I expected it to use a constant, small amount of memory regardless of input file size.
  2. Why is RAM consumption for writing to Parquet (56GB) so much higher than writing to CSV (28GB)?

I thought the streaming API of Polars would be very efficient regarding memory usage, but it turned out that this is not the case (or I am doing something fundamentally wrong).

Test Environment

  • Input file: Parquet file with 700,000,000 rows and 5 columns
  • File sizes: Parquet ~2GB, CSV ~9GB
  • Schema: 4 integer columns + 1 float column
  • Hardware: MacBook Pro with 16 GB RAM, macOS Tahoe
  • Rust/Cargo: Version 1.90.0
  • Polars: Version 0.51.0

Sample Data

The CSV data looks like this:

a,b,c,d,e
0,0,2,3,20.0
1,1,1,4,22.4
1,0,5,1,15.1
1,0,4,0,31.1

Dependencies

Cargo.toml:

polars = { version = "0.51.0", features = ["lazy", "temporal", "parquet"] }

Code

Test Functions

1. Streaming Copy: Parquet → Parquet

fn streaming_from_parquet_to_parquet_copy(input_path: &str, output_path: &str) -> PolarsResult<()> {
    let args = ScanArgsParquet {
        low_memory: true,
        ..Default::default()
    };

    LazyFrame::scan_parquet(PlPath::new(input_path), args)?
        .with_new_streaming(true)
        .sink_parquet(
            SinkTarget::Path(PlPath::new(output_path)),
            get_parquet_write_options(),
            None,
            Default::default()
        )?
        .collect()?;
    Ok(())
}

2. Streaming Copy: Parquet → CSV

fn streaming_from_parquet_to_csv_copy(input_path: &str, output_path: &str) -> PolarsResult<()> {
    let args = ScanArgsParquet {
        low_memory: true,
        ..Default::default()
    };

    LazyFrame::scan_parquet(PlPath::new(input_path), args)?
        .with_new_streaming(true)
        .sink_csv(
            SinkTarget::Path(PlPath::new(output_path)),
            CsvWriterOptions::default(),
            None,
            Default::default()
        )?
        .collect()?;
    Ok(())
}

3. Streaming Copy: Parquet → DataFrame

fn streaming_from_parquet_to_df_copy(input_path: &str) -> PolarsResult<DataFrame> {
    let args = ScanArgsParquet {
        low_memory: true,
        ..Default::default()
    };

    LazyFrame::scan_parquet(PlPath::new(input_path), args)?
        .with_new_streaming(true)
        .collect()
}

Helper Function: Parquet Write Options

/// Helper function to get ParquetWriteOptions with our configured compression
const PARQUET_COMPRESSION: ParquetCompression = ParquetCompression::Uncompressed; // or Snappy

fn get_parquet_write_options() -> ParquetWriteOptions {
    ParquetWriteOptions {
        compression: PARQUET_COMPRESSION,
        row_group_size: Some(500_000), // Flush to disk every 500k rows
        ..Default::default()
    }
}

Test Data Generation

This is the code used to create the test CSV and Parquet files:

fn test_generate_large_files() -> PolarsResult<()> {
    use std::io::BufWriter;
    use std::io::Write;

    let mut rng = rand::rng();
    let file = fs::File::create(TEST_CSV_PATH).expect("Failed to create test CSV file");
    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file);

    // Write header
    writeln!(writer, "a,b,c,d,e").expect("Failed to write header");

    let num_rows = 700_000_000;

    const CHUNK_SIZE: usize = 100_000;
    let mut buffer = String::with_capacity(CHUNK_SIZE * 20);

    for chunk_start in (0..num_rows).step_by(CHUNK_SIZE) {
        buffer.clear();
        let chunk_end = (chunk_start + CHUNK_SIZE).min(num_rows);

        for _i in chunk_start..chunk_end {
            let a = rng.random_range(0..=1);
            let b = rng.random_range(0..=1);
            let c = rng.random_range(0..=5);
            let d = rng.random_range(0..=4);
            let e = rng.random_range(10.0..35.0);

            use std::fmt::Write as FmtWrite;
            writeln!(&mut buffer, "{},{},{},{},{:.1}", a, b, c, d, e)
                .expect("Failed to write to buffer");
        }

        // Write entire chunk at once
        writer.write_all(buffer.as_bytes()).expect("Failed to write chunk");
    }

    writer.flush().expect("Failed to flush writer");
    drop(writer); // Ensure file is closed

    let metadata = fs::metadata(TEST_CSV_PATH).expect("Failed to get file metadata");
    let size_mb = metadata.len() as f64 / (1024.0 * 1024.0);

    // Now convert CSV to Parquet for comparison tests
    let df = LazyCsvReader::new(PlPath::new(TEST_CSV_PATH)).finish()?.collect()?;

    let mut parquet_file = std::fs::File::create(TEST_PARQUET_PATH)?;
    ParquetWriter::new(&mut parquet_file)
        .with_compression(PARQUET_COMPRESSION)
        .finish(&mut df.clone())?;

    Ok(())
}

What am I doing wrong?

Any insights into why the streaming API is using so much memory would be greatly appreciated!

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.