Skip to content

Commit 74b88cf

Browse files
Tomer Shmilovichtshmilnvidia
authored andcommitted
copyBlock with NixlLoopbackAgent
Implement copyBlock function with nixlLoopbackAgent class. Signed-off-by: Tomer Shmilovich <tshmilovich@nvidia.com>
1 parent bbe8c7e commit 74b88cf

File tree

2 files changed

+30
-96
lines changed

2 files changed

+30
-96
lines changed

cpp/include/tensorrt_llm/batch_manager/kvCacheTransferManager.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace tensorrt_llm::batch_manager::kv_cache_manager
3232
class KVCacheTransferManager
3333
{
3434
public:
35-
explicit KVCacheTransferManager(tr::BufferManager const& bufferManager);
35+
explicit KVCacheTransferManager(tr::BufferManager const& bufferManager, BaseLoopbackAgent* loopbackAgent = nullptr);
3636

3737
//! \brief Onboard a block to gpu memory.
3838
void onboard(BlockPtr const& offloadBlock, BlockPtr const& block, std::vector<KVCacheBlockPool> const& pools,
@@ -75,6 +75,9 @@ class KVCacheTransferManager
7575

7676
// Track the block ids offloaded in this iteration.
7777
std::unordered_map<int32_t, tr::CudaEvent> mPendingOffloads;
78+
// Reference to parent loopback agent
79+
BaseLoopbackAgent* mLoopbackAgent;
80+
int mDeviceId;
7881
};
7982

8083
} // namespace tensorrt_llm::batch_manager::kv_cache_manager

cpp/tensorrt_llm/batch_manager/kvCacheTransferManager.cpp

Lines changed: 26 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
namespace tr = tensorrt_llm::runtime;
4444
namespace tk = tensorrt_llm::kernels;
4545

46+
using namespace tensorrt_llm::executor::kv_cache;
47+
namespace kvc = tensorrt_llm::executor::kv_cache;
48+
4649
namespace tensorrt_llm::batch_manager::kv_cache_manager
4750
{
4851

@@ -86,11 +89,14 @@ static bool fileToGpuPosix(tr::ITensor::SharedPtr const& dstPtr, std::string con
8689
return true;
8790
}
8891

89-
KVCacheTransferManager::KVCacheTransferManager(tr::BufferManager const& bufferManager)
92+
KVCacheTransferManager::KVCacheTransferManager(tr::BufferManager const& bufferManager, BaseLoopbackAgent* loopbackAgent)
9093
: mBufferManager{bufferManager}
9194
, mOnboardManager(std::make_shared<tr::CudaStream>())
9295
, mOffloadManager(std::make_shared<tr::CudaStream>())
96+
, mLoopbackAgent{loopbackAgent}
9397
{
98+
TLLM_CUDA_CHECK(cudaGetDevice(&mDeviceId));
99+
TLLM_CHECK(mDeviceId != -1);
94100
}
95101

96102
tr::ITensor::SharedPtr KVCacheTransferManager::computeBlockPointer(
@@ -161,115 +167,40 @@ void KVCacheTransferManager::copyBlock(BlockPtr const& src, BlockPtr const& dst,
161167

162168
TLLM_LOG_ERROR("copyBlock: Non-DRAM mode copy: mode = %d, pool_size = %d", mode, pools.size());
163169

170+
std::vector<FileDesc> fileBlobs;
171+
std::vector<MemoryDesc> memoryBlobs;
172+
164173
for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx)
165174
{
166-
auto srcPtr = computeBlockPointer(src, pools, poolIdx);
167-
auto dstPtr = computeBlockPointer(dst, pools, poolIdx);
175+
auto ptr = isOffload ? computeBlockPointer(src, pools, poolIdx) : computeBlockPointer(dst, pools, poolIdx);
176+
auto block_id = isOffload ? src->getBlockId() : dst->getBlockId();
168177

169178
TLLM_CHECK_WITH_INFO(
170179
directory.has_value(), "Expected a directory path for KVCache offload, but none was provided.");
171180

172-
int size = std::snprintf(
173-
nullptr, 0, "%s/block_%d_pool_%zu.bin", directory.value().c_str(), src->getBlockId(), poolIdx);
181+
int size = std::snprintf(nullptr, 0, "%s/block_%d_pool_%zu.bin", directory.value().c_str(), block_id, poolIdx);
174182

175183
std::string filename(size + 1, '\0');
176-
std::snprintf(filename.data(), filename.size(), "%s/block_%d_pool_%zu.bin", directory.value().c_str(),
177-
src->getBlockId(), poolIdx);
178-
179-
if (mode == executor::KvCacheTransferMode::POSIX_DEBUG_FALLBACK)
180-
{
181-
TLLM_LOG_INFO("Forcing POSIX fallback for file: %s", filename.c_str());
182-
if (isOffload)
183-
{
184-
gpuToFilePosix(srcPtr, filename);
185-
}
186-
else
187-
{
188-
fileToGpuPosix(dstPtr, filename);
189-
}
190-
continue;
191-
}
184+
std::snprintf(
185+
filename.data(), filename.size(), "%s/block_%d_pool_%zu.bin", directory.value().c_str(), block_id, poolIdx);
192186

193187
int openFlags = isOffload ? (O_CREAT | O_WRONLY) : O_RDONLY;
194-
int fd = ::open(filename.c_str(), openFlags, 0664);
195-
if (fd < 0)
196-
{
197-
TLLM_LOG_ERROR(
198-
"Failed to open '%s' for %s; fallback POSIX", filename.c_str(), (isOffload ? "writing" : "reading"));
199188

200-
if (isOffload)
201-
{
202-
gpuToFilePosix(srcPtr, filename);
203-
}
204-
else
205-
{
206-
fileToGpuPosix(dstPtr, filename);
207-
}
208-
continue;
209-
}
189+
fileBlobs.emplace_back(filename, openFlags, 0664, ptr->getSizeInBytes());
190+
memoryBlobs.emplace_back(ptr->data(), ptr->getSizeInBytes(), mDeviceId);
191+
}
210192

211-
#ifdef ENABLE_CUFILE
212-
CUfileDescr_t cufileDesc = {};
213-
cufileDesc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
214-
cufileDesc.handle.fd = fd;
193+
FileDescs fileDescs(fileBlobs);
194+
MemoryDescs memoryDescs(kvc::MemoryType::kVRAM, memoryBlobs);
215195

216-
CUfileHandle_t cufileHandle;
217-
CUfileError_t status = cuFileHandleRegister(&cufileHandle, &cufileDesc);
218-
if (status.err != CU_FILE_SUCCESS)
219-
{
220-
// Fallback to POSIX
221-
TLLM_LOG_WARN(
222-
"cuFileHandleRegister failed (err=%d). Falling back to POSIX for '%s'", status.err, filename.c_str());
223-
::close(fd);
224-
if (isOffload)
225-
gpuToFilePosix(srcPtr, filename);
226-
else
227-
fileToGpuPosix(dstPtr, filename);
228-
continue;
229-
}
196+
mLoopbackAgent->registerMemory(memoryDescs);
197+
mLoopbackAgent->registerFiles(fileDescs);
230198

231-
ssize_t numBytes = static_cast<ssize_t>(srcPtr->getSizeInBytes());
232-
if (isOffload)
233-
{
234-
ssize_t written = cuFileWrite(cufileHandle, srcPtr->data(), numBytes, 0, 0);
235-
if (written < 0)
236-
{
237-
TLLM_LOG_ERROR("cuFileWrite error=%zd. Fallback to POSIX", written);
238-
cuFileHandleDeregister(cufileHandle);
239-
::close(fd);
240-
gpuToFilePosix(srcPtr, filename);
241-
continue;
242-
}
243-
}
244-
else
245-
{
246-
ssize_t readCount = cuFileRead(cufileHandle, dstPtr->data(), numBytes, 0, 0);
247-
if (readCount < 0)
248-
{
249-
TLLM_LOG_ERROR("cuFileRead error=%zd. Fallback to POSIX", readCount);
250-
cuFileHandleDeregister(cufileHandle);
251-
::close(fd);
252-
fileToGpuPosix(dstPtr, filename);
253-
continue;
254-
}
255-
}
199+
std::unique_ptr<TransferStatus> status = mLoopbackAgent->submitLoopbackRequests(memoryDescs, fileDescs, isOffload);
200+
status->wait();
256201

257-
cuFileHandleDeregister(cufileHandle);
258-
::close(fd);
259-
#else
260-
// If GDS isn't enabled, fallback to POSIX automatically
261-
TLLM_LOG_DEBUG("ENABLE_CUFILE=OFF, so fallback to POSIX for %s", filename.c_str());
262-
::close(fd); // close the file opened for GDS
263-
if (isOffload)
264-
{
265-
gpuToFilePosix(srcPtr, filename);
266-
}
267-
else
268-
{
269-
fileToGpuPosix(dstPtr, filename);
270-
}
271-
#endif
272-
}
202+
mLoopbackAgent->deregisterMemory(memoryDescs);
203+
mLoopbackAgent->deregisterFiles(fileDescs);
273204
}
274205

275206
void KVCacheTransferManager::onboard(BlockPtr const& offloadBlock, BlockPtr const& block,

0 commit comments

Comments
 (0)