Skip to content

Commit 8523168

Browse files
authored
[None][fix] Fix KV event consumption (#6346)
Signed-off-by: jthomson04 <jwillthomson19@gmail.com>
1 parent 7cc65a6 commit 8523168

File tree

3 files changed

+18
-8
lines changed

3 files changed

+18
-8
lines changed

cpp/tensorrt_llm/batch_manager/kvCacheEventManager.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ std::deque<tle::KVCacheEvent> KVCacheEventManager::getEvents(std::optional<std::
152152
void KVCacheEventManager::flush()
153153
{
154154
auto eventQueue = std::exchange(mEventQueue, {});
155+
156+
if (eventQueue.empty())
157+
{
158+
return;
159+
}
160+
155161
std::unique_lock<std::mutex> lck(mPendingEventsMutex);
156162
mPendingEvents.push_back(std::move(eventQueue));
157163
mPendingEmptyCV.notify_one();

tensorrt_llm/_torch/pyexecutor/resource_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from tensorrt_llm.runtime import ModelConfig as ModelConfigPython
1818
from tensorrt_llm.sampling_params import SamplingParams
1919

20-
from ..._utils import binding_to_str_dtype, get_size_in_bytes, nvtx_range
20+
from ..._utils import (binding_to_str_dtype, get_size_in_bytes, mpi_rank,
21+
nvtx_range)
2122
from ...logger import logger
2223
from ...mapping import CpType, Mapping
2324
from .kv_cache_connector import KvCacheConnectorManager
@@ -338,6 +339,7 @@ def append_to_kv_heads_per_layer(num_kv_heads_per_layer: List[int],
338339
'copy_on_partial_reuse': kv_cache_config.copy_on_partial_reuse,
339340
'kv_connector_manager': self.kv_connector_manager,
340341
}
342+
341343
if self.event_buffer_max_size > 0:
342344
if mapping.enable_attention_dp:
343345
kwargs['event_manager'] = KVCacheEventManagerCpp(
@@ -347,7 +349,7 @@ def append_to_kv_heads_per_layer(num_kv_heads_per_layer: List[int],
347349
attention_dp_events_gather_period_ms=self.
348350
attention_dp_events_gather_period_ms,
349351
)
350-
else:
352+
elif mpi_rank() == 0:
351353
kwargs['event_manager'] = KVCacheEventManagerCpp(
352354
max_kv_event_entries=self.event_buffer_max_size)
353355

tensorrt_llm/executor/proxy.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,12 @@ def process_res(res):
205205

206206
return True # success
207207

208-
def _iteration_result_task(self, queue: Union[FusedIpcQueue,
209-
IntraProcessQueue],
210-
result_singleton: IterationResult) -> bool:
211-
# iteration result is not urgent, so we can sleep a bit
212-
time.sleep(0.2)
208+
def _iteration_result_task(self,
209+
queue: Union[FusedIpcQueue, IntraProcessQueue],
210+
result_singleton: IterationResult,
211+
urgent: bool = False) -> bool:
212+
if not urgent:
213+
time.sleep(0.2)
213214

214215
try:
215216
data = queue.get()
@@ -267,7 +268,8 @@ def dispatch_stats_task(self) -> bool:
267268

268269
def dispatch_kv_cache_events_task(self) -> bool:
269270
return self._iteration_result_task(self.kv_cache_events_queue,
270-
self._iter_kv_events_result)
271+
self._iter_kv_events_result,
272+
urgent=True)
271273

272274
def _start_dispatch_threads(self):
273275
if self.dispatch_result_thread is None:

0 commit comments

Comments
 (0)