Skip to content

Conversation

@razajafri
Copy link
Collaborator

@razajafri razajafri commented Nov 11, 2025

Fixes #13617

Description

  • Currently, we don't have support for the small file optimization in MultiFileParquetPartitionReader when Deletion Vectors are enabled on a Delta Table. If a query is run on such a table, the plugin will fall back to the MultiFileCloudParquetPartitionReader
  • With this optimization, if small files are present on a local system, the files will be processed much faster by coalescing them
  • This PR adds Deletion Vector processing for every batch by locating the original file used to create the buffer and read it's deletion vector.
  • A new integration test has been added, in addition to that the feature was tested against the baseline and results matched.
  • Due to creation of many deletion vectors, the tests were taking a long time. To speed up the tests, the test data files have been added to the test resource

Performance

  Baseline - GPU with FileScourceScan fallback to CPU GPU COALESCING with DV  
 Percentage Deleted Single run Single run Speedup
5% (500 small files) 8720.0 8111.0 1.08
10% (500 small files) 8556.0 8001.0 1.07
20% (500 small files) 8290.0 8501.0 0.98
40% (500 small files) 8278.0 8580.0 0.96

Baseline: commit id - 21afb61
Target: This PR
Dataset: TPC-DS (sf100_parquet)
Environment: Local
Spark Configs

export SPARK_CONF=("--master" "local[16]"
                   "--conf" "spark.driver.maxResultSize=2GB"
                   "--conf" "spark.driver.memory=50G"
                   "--conf" "spark.executor.cores=16"
                   "--conf" "spark.executor.instances=1"
                   "--conf" "spark.executor.memory=16G"
                   "--conf" "spark.driver.maxResultSize=4gb"
                   "--conf" "spark.sql.files.maxPartitionBytes=2gb"
                   "--conf" "spark.sql.adaptive.enabled=true"
                   "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin"
                   "--conf" "spark.rapids.memory.host.spillStorageSize=16G"
                   "--conf" "spark.rapids.memory.pinnedPool.size=8g"
                   "--conf" "spark.rapids.sql.concurrentGpuTasks=3"
                   "--conf" "spark.rapids.sql.explain=all"
                   "--conf" "spark.sql.warehouse.dir=/home/rjafri/spark-warehouse"
                   "--conf" "spark.sql.legacy.createHiveTableByDefault=false"
                   "--conf" "spark.databricks.delta.deletionVectors.useMetadataRowIndex=false"
                   "--conf" "spark.rapids.sql.format.parquet.reader.type=COALESCING"
                   "--packages" "io.delta:delta-spark_2.12:3.3.0"
                   "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
                   "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
                   "--conf" "spark.driver.extraClassPath=$SPARK_RAPIDS_PLUGIN_JAR:$NDS_LISTENER_JAR"
                   "--conf" "spark.executor.extraClassPath=$SPARK_RAPIDS_PLUGIN_JAR:$NDS_LISTENER_JAR")

Query: select sum(ss_list_price) from store_sales

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

@razajafri razajafri force-pushed the SP-13617-coalescing-reader branch from d24af74 to a9ef584 Compare November 12, 2025 18:14
@gerashegalov
Copy link
Collaborator

The PR branch contains 4.7K files, need to drop unrelated files

image

@razajafri
Copy link
Collaborator Author

The PR branch contains 4.7K files, need to drop unrelated files

image

I added the deletion vectors as part of the PR because the tests were taking too long to run. The files are being added via Git LFS.

@gerashegalov
Copy link
Collaborator

IMO We need to find a way to have fast tests without adding 4.7K files.

@razajafri
Copy link
Collaborator Author

IMO We need to find a way to have fast tests without adding 4.7K files.

I have reduced the number of test files.

@gerashegalov
Copy link
Collaborator

IMO We need to find a way to have fast tests without adding 4.7K files.

I have reduced the number of test files.

you can reduce further by getting rid of the crc checksum files

@sameerz sameerz added the feature request New feature or request label Nov 13, 2025
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Nov 13, 2025

Greptile Overview

Greptile Summary

This PR successfully adds deletion vector support to the MultiFileParquetPartitionReader (coalescing reader), enabling performance optimization for small files on local systems when deletion vectors are present on Delta tables.

Key Changes:

  • Extended MultiFileCoalescingPartitionReaderBase with a finalizeOutputBatch callback that allows subclasses to process batches with extra context (like file boundaries and row indices)
  • Implemented DeltaCoalescingFileParquetPartitionReader that tracks which parquet files contribute to each batch using boundaries calculated from row counts
  • Added CoalescedRapidsDropMarkedRowsFilter that handles deletion vectors across multiple coalesced files by adjusting offsets based on file boundaries
  • Modified getCoalescingIterator to track row indices across batches for proper deletion vector application
  • Added comprehensive integration test with pre-generated test data to validate correctness

Architecture:
The implementation cleverly uses boundaries (cumulative row counts) to determine which files contribute to a given batch, then creates a coalesced filter that applies the appropriate deletion vector with offset adjustments for each contributing file. This ensures deleted rows are correctly identified even when multiple small files are coalesced into a single batch.

Performance:
The PR description shows performance is comparable to baseline (0.94-1.01x), which is expected since the optimization primarily benefits from coalescing small files rather than improving DV processing itself.

Confidence Score: 5/5

  • This PR is safe to merge with high confidence
  • The implementation is well-designed with proper boundary tracking, comprehensive testing, and follows established patterns in the codebase. The logic for mapping row indices to source files using boundaries is sound, and the offset adjustments in CoalescedRapidsDropMarkedRowsFilter correctly handle deletion vectors across coalesced files. Performance testing shows expected results, and the new integration test validates correctness with both sequential and random deletion patterns.
  • No files require special attention

Important Files Changed

File Analysis

Filename Score Overview
delta-lake/common/src/main/delta-33x-40x/scala/com/nvidia/spark/rapids/delta/common/GpuDeltaParquetFileFormatBase.scala 5/5 Added deletion vector support for coalescing reader in DeltaCoalescingFileParquetPartitionReader with proper boundary tracking and offset calculation
delta-lake/common/src/main/delta-33x-40x/scala/com/nvidia/spark/rapids/delta/common/RapidsRowIndexFilters.scala 5/5 Implemented CoalescedRapidsDropMarkedRowsFilter to handle deletion vectors across multiple coalesced files with proper offset adjustments
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala 5/5 Added finalizeOutputBatch callback mechanism to MultiFileCoalescingPartitionReaderBase for per-batch processing with extra info
integration_tests/src/main/python/delta_lake_delete_test.py 5/5 Added comprehensive test test_deletion_vectors_coalescing_multiple_files for coalescing reader with deletion vectors using pre-generated test data

Sequence Diagram

sequenceDiagram
    participant User
    participant DeltaMultiFileReaderFactory
    participant DeltaCoalescingFileParquetPartitionReader
    participant MultiFileParquetPartitionReader
    participant RapidsDeletionVectorUtils
    participant CoalescedRapidsDropMarkedRowsFilter
    
    User->>DeltaMultiFileReaderFactory: createColumnarReader(partition)
    DeltaMultiFileReaderFactory->>DeltaMultiFileReaderFactory: Check if coalescing or multi-threaded
    
    alt Coalescing (Local Files)
        DeltaMultiFileReaderFactory->>DeltaCoalescingFileParquetPartitionReader: Create reader
        DeltaCoalescingFileParquetPartitionReader->>MultiFileParquetPartitionReader: readBatch()
        MultiFileParquetPartitionReader->>MultiFileParquetPartitionReader: Coalesce multiple small files
        MultiFileParquetPartitionReader->>DeltaCoalescingFileParquetPartitionReader: Return batch
        DeltaCoalescingFileParquetPartitionReader->>RapidsDeletionVectorUtils: getCoalescedRowIndexFilter()
        RapidsDeletionVectorUtils->>RapidsDeletionVectorUtils: Find relevant files based on boundaries
        RapidsDeletionVectorUtils->>CoalescedRapidsDropMarkedRowsFilter: Create filter with offsets
        CoalescedRapidsDropMarkedRowsFilter->>DeltaCoalescingFileParquetPartitionReader: Return filter
        DeltaCoalescingFileParquetPartitionReader->>RapidsDeletionVectorUtils: processBatchWithDeletionVector()
        RapidsDeletionVectorUtils->>RapidsDeletionVectorUtils: Apply deletion vector to batch
        RapidsDeletionVectorUtils->>DeltaCoalescingFileParquetPartitionReader: Return filtered batch
        DeltaCoalescingFileParquetPartitionReader->>User: Return batch with deleted rows marked
    else Multi-threaded (Cloud Files)
        DeltaMultiFileReaderFactory->>DeltaMultiFileParquetPartitionReader: Create reader
        DeltaMultiFileParquetPartitionReader->>DeltaMultiFileParquetPartitionReader: get()
        DeltaMultiFileParquetPartitionReader->>RapidsDeletionVectorUtils: getRowIndexFilter()
        RapidsDeletionVectorUtils->>DeltaMultiFileParquetPartitionReader: Return filter
        DeltaMultiFileParquetPartitionReader->>RapidsDeletionVectorUtils: processBatchWithDeletionVector()
        RapidsDeletionVectorUtils->>DeltaMultiFileParquetPartitionReader: Return filtered batch
        DeltaMultiFileParquetPartitionReader->>User: Return batch with deleted rows marked
    end
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

87 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@razajafri
Copy link
Collaborator Author

The performance numbers aren't good for this PR. I am working on improving performance.

@nvauto
Copy link
Collaborator

nvauto commented Nov 17, 2025

NOTE: release/25.12 has been created from main. Please retarget your PR to release/25.12 if it should be included in the release.

@razajafri razajafri changed the base branch from main to release/25.12 November 20, 2025 18:35
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

88 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@razajafri
Copy link
Collaborator Author

Here is the breakdown of performance numbers when benchmarking the Time to materialize

% deleted MULTITHREADED COALESCING
5 158 ms 932 ms
10 214 ms 1.8 s
20 238 ms 2.4 s
40 242 ms 4.1 s

Breaking it further reveals that as the delete percentage increases, the time taken to add offsets to the bitmap becomes the dominant factor in determining the Time to materialize

% deleted Materialize
MULTITHREADED COALESCING
Time to create arrayTime to create arrayTime to add offsets
515 ms 72 ms 254 ms
1050 ms 125 ms 612 ms
2080 ms 155 ms 971 ms
4093 ms 271 ms 2200 ms

A table representing the performance numbers above as a percentage of the total materialization time reveals the adding offsets as a major contributor to the slowness

Array creation as percentage of total materialization
% deletedMULTITHREADEDCOALESCING
Time to create arrayTime to create arrayTime to add offsets
5 9.5 7.7 27.3
10 23.4 7 34
20 33.6 6.5 40.5
40 38.4 6.6 53.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[TASK] Add support for coalescing files in MultiFileParquetParquetPartitionReader (onPrem reader) when reading Tables with Deletion Vectors enabled

4 participants