Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.iceberg.data
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.{GpuBoundReference, GpuColumnVector, GpuExpression, GpuMetric, LazySpillableColumnarBatch}
import com.nvidia.spark.rapids.{GpuBoundReference, GpuColumnVector, GpuExpression, GpuMetric, SpillableColumnarBatch}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric.{JOIN_TIME, OP_TIME_LEGACY}
import com.nvidia.spark.rapids.fileio.iceberg.{IcebergFileIO, IcebergInputFile}
Expand Down Expand Up @@ -379,17 +379,15 @@ object GpuDeleteFilter2 {
}

private case class DeleteFilterContext(
buildBatch: LazySpillableColumnarBatch,
buildBatch: SpillableColumnarBatch,
buildKeys: Seq[GpuExpression],
probeKeys: Seq[GpuExpression],
numFirstConditionColumns: Int,
opTime: GpuMetric,
joinTime: GpuMetric) {
def filter(input: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
val probeSide = input.map { cb =>
withResource(cb) {
LazySpillableColumnarBatch(_, "Deletes probe")
}
SpillableColumnarBatch(cb, "Deletes probe")
}

new HashedExistenceJoinIterator(buildBatch,
Expand All @@ -402,4 +400,4 @@ private case class DeleteFilterContext(
opTime,
joinTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids.iceberg.data

import ai.rapids.cudf.{Table => CudfTable}
import com.nvidia.spark.rapids.{GpuColumnVector, LazySpillableColumnarBatch}
import com.nvidia.spark.rapids.{GpuColumnVector, SpillableColumnarBatch}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.fileio.iceberg.{IcebergFileIO, IcebergInputFile}
import com.nvidia.spark.rapids.iceberg.parquet.{GpuCoalescingIcebergParquetReader, GpuIcebergParquetReader, GpuIcebergParquetReaderConf, GpuMultiThreadIcebergParquetReader, GpuSingleThreadIcebergParquetReader, IcebergPartitionedFile, MultiFile, MultiThread, SingleFile}
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
trait GpuDeleteLoader {
def loadDeletes(deletes: Seq[DeleteFile],
schema: Schema,
sparkTypes: Array[DataType]): LazySpillableColumnarBatch
sparkTypes: Array[DataType]): SpillableColumnarBatch
}

class DefaultDeleteLoader(
Expand All @@ -41,7 +41,7 @@ class DefaultDeleteLoader(

def loadDeletes(deletes: Seq[DeleteFile],
schema: Schema,
sparkTypes: Array[DataType]): LazySpillableColumnarBatch = {
sparkTypes: Array[DataType]): SpillableColumnarBatch = {
val files = deletes.map(f => IcebergPartitionedFile(inputFiles(f.path().toString)))
withResource(createReader(schema, files)) { reader =>
withResource(new ArrayBuffer[ColumnarBatch]()) { batches =>
Expand All @@ -56,14 +56,12 @@ class DefaultDeleteLoader(

if (tables.size > 1) {
withResource(CudfTable.concatenate(tables.toArray: _*)) { combined =>
withResource(GpuColumnVector.from(combined, sparkTypes)) { combinedBatch =>
LazySpillableColumnarBatch(combinedBatch, "Eq deletes")
}
val combinedBatch = GpuColumnVector.from(combined, sparkTypes)
SpillableColumnarBatch(combinedBatch, "Eq deletes")
}
} else {
withResource(GpuColumnVector.from(tables.head, sparkTypes)) { singleBatch =>
LazySpillableColumnarBatch(singleBatch, "Eq deletes")
}
val singleBatch = GpuColumnVector.from(tables.head, sparkTypes)
SpillableColumnarBatch(singleBatch, "Eq deletes")
}
}
}
Expand Down Expand Up @@ -94,4 +92,4 @@ class DefaultDeleteLoader(
newConf)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -143,6 +143,7 @@ public ColumnarBatch next() {
long dataLength = calcDataLengthEstimate(numRowsEstimate);
long offsetLength = calcOffsetLengthEstimate(numRowsEstimate);
int used[];

try (SpillableHostBuffer spillableBuffer = sBufAndNumRows._1;
) {
HostMemoryBuffer[] hBufs =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf.{GatherMap, NvtxColor, OutOfBoundsPolicy}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalfGpu, withRestoreOnRetry, withRetry}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
Expand Down Expand Up @@ -126,6 +126,7 @@ abstract class AbstractGpuJoinIterator(

override def close(): Unit = {
if (!closed) {
super.close()
nextCb.foreach(_.close())
nextCb = None
gathererStore.foreach(_.close())
Expand Down Expand Up @@ -157,12 +158,6 @@ abstract class AbstractGpuJoinIterator(
gathererStore.foreach(_.close())
gathererStore = None
}

if (ret.isDefined) {
// We are about to return something. We got everything we need from it so now let it spill
// if there is more to be gathered later on.
gathererStore.foreach(_.allowSpilling())
}
ret
}
}
Expand All @@ -180,9 +175,9 @@ abstract class AbstractGpuJoinIterator(
*/
abstract class SplittableJoinIterator(
gatherNvtxName: String,
stream: Iterator[LazySpillableColumnarBatch],
stream: Iterator[SpillableColumnarBatch],
streamAttributes: Seq[Attribute],
builtBatch: LazySpillableColumnarBatch,
builtBatch: SpillableColumnarBatch,
targetSize: Long,
opTime: GpuMetric,
joinTime: GpuMetric)
Expand All @@ -194,9 +189,9 @@ abstract class SplittableJoinIterator(
// For some join types even if there is no stream data we might output something
private var isInitialJoin = true
// If the join explodes this holds batches from the stream side split into smaller pieces.
private val pendingSplits = scala.collection.mutable.Queue[LazySpillableColumnarBatch]()
private val pendingSplits = scala.collection.mutable.Queue[SpillableColumnarBatch]()

protected def computeNumJoinRows(cb: LazySpillableColumnarBatch): Long
protected def computeNumJoinRows(cb: SpillableColumnarBatch): Long

/**
* Create a join gatherer.
Expand All @@ -205,7 +200,7 @@ abstract class SplittableJoinIterator(
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
protected def createGatherer(cb: LazySpillableColumnarBatch,
protected def createGatherer(cb: SpillableColumnarBatch,
numJoinRows: Option[Long]): Option[JoinGatherer]

override def hasNextStreamBatch: Boolean = {
Expand All @@ -222,36 +217,34 @@ abstract class SplittableJoinIterator(
stream.next()
}
opTime.ns {
withResource(scb) { scb =>
val numJoinRows = computeNumJoinRows(scb)
val numJoinRows = closeOnExcept(scb) { _ => computeNumJoinRows(scb) }

// We want the gather maps size to be around the target size. There are two gather maps
// that are made up of ints, so compute how many rows on the stream side will produce the
// desired gather maps size.
val maxJoinRows = Math.max(1, targetSize / (2 * Integer.BYTES))
if (numJoinRows > maxJoinRows && scb.numRows > 1) {
// Need to split the batch to reduce the gather maps size. This takes a simplistic
// approach of assuming the data is uniformly distributed in the stream table.
val numSplits = Math.min(scb.numRows,
Math.ceil(numJoinRows.toDouble / maxJoinRows).toInt)
splitAndSave(scb.getBatch, numSplits)

// Return no gatherer so the outer loop will try again
return None
// We want the gather maps size to be around the target size. There are two gather maps
// that are made up of ints, so compute how many rows on the stream side will produce the
// desired gather maps size.
val maxJoinRows = Math.max(1, targetSize / (2 * Integer.BYTES))
if (numJoinRows > maxJoinRows && scb.numRows() > 1) {
// Need to split the batch to reduce the gather maps size. This takes a simplistic
// approach of assuming the data is uniformly distributed in the stream table.
val numSplits = Math.min(scb.numRows(),
Math.ceil(numJoinRows.toDouble / maxJoinRows).toInt)
withResource(scb) { _ =>
splitAndSave(scb, numSplits)
}

createGatherer(scb, Some(numJoinRows))
// Return no gatherer so the outer loop will try again
return None
}

createGatherer(scb, Some(numJoinRows))
}
} else {
opTime.ns {
assert(wasInitialJoin)
import scala.collection.JavaConverters._
withResource(GpuColumnVector.emptyBatch(streamAttributes.asJava)) { cb =>
withResource(LazySpillableColumnarBatch(cb, "empty_stream")) { scb =>
createGatherer(scb, None)
}
}
val emptyStreamCb = GpuColumnVector.emptyBatch(streamAttributes.asJava)
val scb = SpillableColumnarBatch(emptyStreamCb, "empty_stream")
createGatherer(scb, None)
}
}
}
Expand All @@ -266,25 +259,21 @@ abstract class SplittableJoinIterator(
}

private def splitStreamBatch(
cb: ColumnarBatch,
numBatches: Int): Seq[LazySpillableColumnarBatch] = {
val batchSize = cb.numRows() / numBatches
val splits = withResource(GpuColumnVector.from(cb)) { tab =>
val splitIndexes = (1 until numBatches).map(num => num * batchSize)
tab.contiguousSplit(splitIndexes: _*)
}
withResource(splits) { splits =>
val schema = GpuColumnVector.extractTypes(cb)
withResource(splits.safeMap(_.getTable)) { tables =>
withResource(tables.safeMap(GpuColumnVector.from(_, schema))) { batches =>
batches.safeMap { splitBatch =>
val lazyCb = LazySpillableColumnarBatch(splitBatch, "stream_data")
lazyCb.allowSpilling()
lazyCb
}
scb: SpillableColumnarBatch,
numBatches: Int): Seq[SpillableColumnarBatch] = {
val schema = scb.dataTypes
val batchSize = scb.numRows() / numBatches
val splits = {
withResource(scb.getColumnarBatch()) { cb =>
withResource(GpuColumnVector.from(cb)) { tab =>
val splitIndexes = (1 until numBatches).map(num => num * batchSize)
tab.contiguousSplit(splitIndexes: _*)
}
}
}
splits.safeMap { t =>
SpillableColumnarBatch(t, schema, "stream_data")
}
}

/**
Expand All @@ -295,10 +284,10 @@ abstract class SplittableJoinIterator(
* @param oom a prior OOM exception that this will try to recover from by splitting
*/
protected def splitAndSave(
cb: ColumnarBatch,
scb: SpillableColumnarBatch,
numBatches: Int,
oom: Option[Throwable] = None): Unit = {
val batchSize = cb.numRows() / numBatches
val batchSize = scb.numRows() / numBatches
if (oom.isDefined && batchSize < 100) {
// We just need some kind of cutoff to not get stuck in a loop if the batches get to be too
// small but we want to at least give it a chance to work (mostly for tests where the
Expand All @@ -311,7 +300,7 @@ abstract class SplittableJoinIterator(
} else {
logInfo(msg)
}
pendingSplits ++= splitStreamBatch(cb, numBatches)
pendingSplits ++= splitStreamBatch(scb, numBatches)
}

/**
Expand All @@ -323,8 +312,8 @@ abstract class SplittableJoinIterator(
*/
protected def makeGatherer(
maps: Array[GatherMap],
leftData: LazySpillableColumnarBatch,
rightData: LazySpillableColumnarBatch,
leftData: SpillableColumnarBatch,
rightData: SpillableColumnarBatch,
joinType: JoinType): Option[JoinGatherer] = {
assert(maps.length > 0 && maps.length <= 2)
try {
Expand All @@ -334,7 +323,7 @@ abstract class SplittableJoinIterator(
// are not rearranged by the join.
new JoinGathererSameTable(leftData)
case _ =>
val lazyLeftMap = LazySpillableGatherMap(maps.head, "left_map")
val lazyLeftMap = SpillableGatherMap(maps.head, "left_map")
// Inner joins -- manifest the intersection of both left and right sides. The gather maps
// contain the number of rows that must be manifested, and every index
// must be within bounds, so we can skip the bounds checking.
Expand All @@ -348,7 +337,7 @@ abstract class SplittableJoinIterator(
JoinGatherer(lazyLeftMap, leftData, leftOutOfBoundsPolicy)
}
val rightMap = joinType match {
case _ if rightData.numCols == 0 => None
case _ if rightData.numCols() == 0 => None
case LeftOuter if maps.length == 1 =>
// Distinct left outer joins only produce a single gather map since left table rows
// are not rearranged by the join.
Expand All @@ -357,7 +346,7 @@ abstract class SplittableJoinIterator(
case _ => Some(maps(1))
}
val gatherer = rightMap match {
case None if joinType == RightOuter && rightData.numCols > 0 =>
case None if joinType == RightOuter && rightData.numCols() > 0 =>
// Distinct right outer joins only produce a single gather map since right table rows
// are not rearranged by the join.
MultiJoinGather(leftGatherer, new JoinGathererSameTable(rightData))
Expand All @@ -377,7 +366,7 @@ abstract class SplittableJoinIterator(
case _: InnerLike | RightOuter => OutOfBoundsPolicy.DONT_CHECK
case _ => OutOfBoundsPolicy.NULLIFY
}
val lazyRightMap = LazySpillableGatherMap(right, "right_map")
val lazyRightMap = SpillableGatherMap(right, "right_map")
val rightGatherer = JoinGatherer(lazyRightMap, rightData, rightOutOfBoundsPolicy)
MultiJoinGather(leftGatherer, rightGatherer)
}
Expand Down
Loading