-
Notifications
You must be signed in to change notification settings - Fork 264
Adds in support for a "CpuBridge" that lets us fall back to the CPU on a per-expression level instead of per-SparkPlan node level. [databricks] #13368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a per expression level instead of per-SparkPlan node level. A lot fo this code was written with AI (specifically claude-4-sonnet through cursor) Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeOptimizer.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeOptimizer.scala
Outdated
Show resolved
Hide resolved
| /** | ||
| * Converts a CPU expression to a GPU expression. | ||
| */ | ||
| def convertToGpuBase(): Expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a little confused about convertToGpuBase vs convertToGpu (the naming), and kind of wish we didn't use Base in the name. Something like: keeping convertToGpu for, well, converting to the GPU, and using something different for the function defined in line 1453 that indicates it's going to be either or. I do not know if this is possible or adds a lot more work.
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces support for a "CpuBridge" feature that enables GPU-CPU hybrid execution at an expression level rather than falling back entire SparkPlan nodes to the CPU. The primary goal is to minimize data movement costs, utilize GPU resources more efficiently, and allow CPU expressions to run in parallel using a thread pool.
Key changes:
- Adds CPU bridge infrastructure with thread pool for parallel CPU expression evaluation
- Modifies expression metadata system to support bridge decision-making and optimization
- Updates method signatures from
convertToGpu()toconvertToGpuImpl()and introduces new wrapper logic
Reviewed Changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| RapidsMeta.scala | Core infrastructure for CPU bridge support with expression analysis and optimization logic |
| RapidsConf.scala | Configuration options for enabling CPU bridge and controlling which expressions can use it |
| GpuOverrides.scala | Updates expression metadata classes to use new convertToGpuImpl() method signature |
| GpuCpuBridgeExpression.scala | Main bridge expression implementation handling GPU-to-CPU data transfer and parallel evaluation |
| GpuCpuBridgeOptimizer.scala | Optimizer for making bridge vs GPU decisions and merging adjacent bridge expressions |
| GpuCpuBridgeThreadPool.scala | Thread pool implementation with priority queuing and task context propagation |
| various shim files | Method signature updates from convertToGpu() to convertToGpuBase() or convertToGpuImpl() |
| cpu_bridge_test.py | Integration tests for CPU bridge functionality |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeThreadPool.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeOptimizer.scala
Show resolved
Hide resolved
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
|
I did run into one unexpected issue when I tried to run and also NPEs like I am going to spend some time to try and understand this a bit more and see if I can fix it. |
|
I fixed the ScalarSubquery issue so I think this is ready to go. |
|
build |
|
build |
|
build |
abellina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to spend more time in GpuCpuBridgeExpression.scala and optimizer, but I think this is looking good.
| TEST_PARALLEL_OPTS=() | ||
| elif [[ ${TEST_PARALLEL} -gt ${MAX_PARALLEL} ]]; then | ||
| TEST_PARALLEL_OPTS=("-n" "$MAX_PARALLEL") | ||
| TEST_PARALLEL_OPTS=("-n" "$MAX_PARALLEL" "--dist=load" "--maxschedchunk=0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a comment would be nice describing what --dist=load and --maxschedchunk=0 do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this is something that leaked in here by accident. This is for #13235 and I was running a lot of tests so I added this to speed things up.
|
|
||
|
|
||
| @allow_non_gpu("ProjectExec", "Pmod") | ||
| @allow_non_gpu("ProjectExec", "Pmod", "BoundReference", "Literal", "PromotePrecision") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to allow these other operators non_gpu?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could make it so that Literal and BoundReference are ignored automatically, because they are all over the place. PromotePrecision is needed because on older versions of Spark it will insert this to track metadata when it will cast a Decimal expression to a higher precision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should also explain that if we fall back to the CPU using the bridge we still want to throw an exception. So we then need to choose between adding an allow list of expressions, which this has done, or allowing the fallback if the Exec it is a part of is in the allow list. I picked the former because I felt it would let us be more accurate in our testing.
| Some(GpuBindReferences.bindGpuReferences(finalProjections, aggBufferAttributes)) | ||
| val bound = Some(GpuBindReferences.bindGpuReferences(finalProjections, aggBufferAttributes)) | ||
| // Inject CPU bridge metrics if provided | ||
| metrics.foreach(m => bound.foreach(GpuMetric.injectMetrics(_, m))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we pass metrics to bindGpuReferences and inject them there? Might make the code more compact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I debated this a lot. I was worried that it would make the code for bind references more complicated, especially the part where and expression can take over for how it binds references, but I agree it is kind of ugly as it is today and prone to errors if we forget a place. I will try it out and see how complex it gets.
| case class PrioritizedCpuBridgeTask[T]( | ||
| task: Callable[T], | ||
| taskContext: TaskContext, | ||
| batchSize: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is batchSize used for?
| nullSafe = false, | ||
| releaseSemaphore = false | ||
| ) | ||
| val r = new NvtxRange("evaluateOnCPU", NvtxColor.BLUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, withResource
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
|
build |
|
build |
|
build |
|
build |
zpuller
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still working through it but I had a couple comments for now
| * from SparkPlan nodes. Use the public API that requires metrics instead. | ||
| */ | ||
| def bindGpuReferences[A <: Expression]( | ||
| def bindGpuReferencesInternal[A <: Expression]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we/should we make this project private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want it to be private because there are a few cases, even though they are rare, where we want to call it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant project private as in it would be callable from (most) RAPIDS code but not from spark itself. But anyway I don't have a strong opinion
| override def prettyName: String = "gpu_cpu_bridge" | ||
|
|
||
| override def toString: String = { | ||
| val gpuInputsStr = if (gpuInputs.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need to check if it's nonEmpty? if we took an empty Seq and did .mkString(", ") wouldn't it just be empty string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Cursor does not always deal with these very well.
| * Takes an iterator of rows and the expected row count, produces a complete | ||
| * GpuColumnVector result. | ||
| */ | ||
| private def createEvaluationFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this and the below take a cpuExpression argument, given that cpuExpressions is a field of this class? I could see reasons to do this, but where I get confused is that the threadLocalProjection seems to be based off the field cpuExpression, not the arg
|
build |
|
build |
zpuller
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posted one other minor question, but looking good
| override def call(): T = { | ||
| // Register this thread with RmmSpark for memory tracking if we have a task context | ||
| if (taskContext != null) { | ||
| RmmSpark.currentThreadIsDedicatedToTask(taskContext.taskAttemptId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use TaskRegistryTracker here, or do we not want the same behavior eg. retries?
|
what is the state of this? Is it something we should re-review? |
|
NOTE: release/25.12 has been created from main. Please retarget your PR to release/25.12 if it should be included in the release. |
Description
This attempts to make it possible to fall back to the CPU in a more efficient way. In the current code we fall back to the CPU at the level of a
SparkPlannode. So an entireProjectExecorHashAggregateExecwould fall back to the CPU if a single expression in it could not run on the GPU.SparkPlannode to the CPU and then the entire result back to the GPU again.SparkPlannode only a single CPU thread would be used to process the data. This does not scale well, in all cases.The cpu bridge has a thread pool that is used to execute the CPU expressions in parallel moving only the minimal data needed. This allows more of the processing to stay on the GPU, it minimizes data movement, and even though it does not release the semaphore when running on the CPU it offsets this by throwing as many cores at the processing as there are configured tasks.
This does not work for non-deterministic expressions or aggregations.
It is currently off by default for two reasons
From a performance standpoint I have tested it in a few situations.
I also ran some much simpler tests where almost the entire query is a single expression that is not on the GPU.
The bridge version is more memory efficient than falling back for everything, but not quite as good as a pure GPU implementation. There is code to allow us to run the expression as an interpreted expression instead of as code gen, but the performance can vary wildly, and is generally slower than the code gen version. I am happy to rip out the interpreted version as the code gen version can fall back to an interpreted version behind the scenes in Spark. It just requires setting a separate config entirely.
Note: A lot of this code was written with AI (specifically claude-4-sonnet through cursor)
Checklists