diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index 44cf528bed3..30bf0ef0c18 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -158,6 +158,8 @@ class ForwardMeta: # for prefill exist_prefill: bool = False + audio_token_num: int = 0 + # for mla & dsa position_ids: Optional[paddle.Tensor] = None mask_encoder_batch: Optional[paddle.Tensor] = None diff --git a/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py b/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py index c04f137d10d..5a3c69038dd 100644 --- a/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py +++ b/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py @@ -29,12 +29,207 @@ capture_custom_allreduce, custom_ar_clear_ipc_handles, ) -from fastdeploy.platforms import current_platform from fastdeploy.utils import get_logger logger = get_logger("cudagrpah_piecewise_backend", "cudagraph_piecewise_backend.log") +# --------------------------------------------------------------------------- +# c10::cuda stream synchronization +# --------------------------------------------------------------------------- +# DeepEP's C++ extension (deep_ep_cpp) calls c10::cuda::getCurrentCUDAStream() +# to determine which CUDA stream to use. Paddle provides a compatibility +# implementation of this function in libphi_core.so, backed by a thread-local +# variable `tls_current_streams`. However, paddle.device.stream_guard() only +# updates Paddle's GPUContext stream -- it does NOT call +# c10::cuda::setCurrentCUDAStream(). As a result, DeepEP's C++ code sees the +# default stream instead of the capture stream, and its operations are NOT +# captured in the CUDA graph. +# +# The helper below synchronises the c10 stream state so that +# getCurrentCUDAStream() returns the same stream that Paddle's stream_guard +# set. It uses ctypes to call the C++ functions directly in libphi_core.so. +# +# ABI notes (x86_64, GCC, libstdc++): +# c10::cuda::CUDAStream contains c10::Stream which contains c10::Device. +# c10::Device has a std::string member, making CUDAStream non-POD. +# Non-POD return/argument types are passed via a hidden pointer (rdi). +# The raw cudaStream_t (StreamId) is stored at offset 40 in the buffer. + +_c10_lib = None +_c10_set_fn = None +_c10_ext_fn = None +_c10_get_fn = None +_C10_STREAM_BUF_SIZE = 64 # sizeof(CUDAStream) <= 48, use 64 for safety +_C10_STREAM_ID_OFFSET = 40 # offset of StreamId within CUDAStream buffer + + +def _init_c10_stream_funcs(): + """Lazily resolve c10::cuda symbols from libphi_core.so.""" + global _c10_lib, _c10_set_fn, _c10_ext_fn, _c10_get_fn + if _c10_lib is not None: + return _c10_lib is not False + try: + import ctypes + import glob + + # Locate libphi_core.so + candidates = glob.glob(os.path.join(os.path.dirname(paddle.__file__), "libs", "libphi_core.so")) + if not candidates: + _c10_lib = False + return False + _c10_lib = ctypes.CDLL(candidates[0], mode=ctypes.RTLD_GLOBAL) + + # c10::cuda::getStreamFromExternal(cudaStream_t, c10::DeviceIndex) + # Returns CUDAStream via hidden pointer (rdi), args: rsi=stream, edx=device_index + _c10_ext_fn = _c10_lib._ZN3c104cuda21getStreamFromExternalEP11CUstream_sta + _c10_ext_fn.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int] + _c10_ext_fn.restype = None + + # c10::cuda::setCurrentCUDAStream(CUDAStream) + # Takes CUDAStream via hidden pointer (rdi) + _c10_set_fn = _c10_lib._ZN3c104cuda20setCurrentCUDAStreamENS0_10CUDAStreamE + _c10_set_fn.argtypes = [ctypes.c_void_p] + _c10_set_fn.restype = None + + # c10::cuda::getCurrentCUDAStream(c10::DeviceIndex) + # Returns CUDAStream via hidden pointer (rdi), arg: esi=device_index + _c10_get_fn = _c10_lib._ZN3c104cuda20getCurrentCUDAStreamEa + _c10_get_fn.argtypes = [ctypes.c_void_p, ctypes.c_int] + _c10_get_fn.restype = None + + return True + except Exception as e: + logger.debug(f"Failed to init c10 stream funcs: {e}") + _c10_lib = False + return False + + +def _set_c10_current_stream(raw_stream, device_index): + """Set c10::cuda's current stream so that DeepEP uses the correct stream. + + Args: + raw_stream: cudaStream_t as an integer. + device_index: CUDA device index (int). + """ + if not _init_c10_stream_funcs(): + return + import ctypes + + buf = ctypes.create_string_buffer(_C10_STREAM_BUF_SIZE) + _c10_ext_fn(buf, ctypes.c_void_p(int(raw_stream)), device_index) + _c10_set_fn(buf) + + +def _get_c10_current_stream(device_index): + """Get c10::cuda's current raw stream (cudaStream_t as integer). + + Returns the raw cudaStream_t or None on failure. + """ + if not _init_c10_stream_funcs(): + return None + import ctypes + + buf = ctypes.create_string_buffer(_C10_STREAM_BUF_SIZE) + _c10_get_fn(buf, device_index) + return int.from_bytes(buf.raw[_C10_STREAM_ID_OFFSET : _C10_STREAM_ID_OFFSET + 8], "little") + + +def _get_cuda_device_index(): + """Get the current CUDA device index as an integer.""" + try: + return paddle.framework.core.get_cuda_current_device_id() + except Exception: + # Fallback: parse from paddle.device.get_device() which returns e.g. 'gpu:0' + dev_str = str(paddle.device.get_device()) + if ":" in dev_str: + return int(dev_str.split(":")[1]) + return 0 + + +def _get_paddle_raw_stream(device_index): + """Get Paddle's current raw CUDA stream for the given device.""" + try: + from paddle.base import core + + return core._get_current_stream(int(device_index)).raw_stream + except Exception: + return None + + +class _DeepEPStreamGuard: + """Context manager that sets both Paddle and c10::cuda current streams. + + Paddle's ``paddle.device.stream_guard`` only updates the GPUContext + stream. DeepEP's C++ extension reads the current stream via + ``c10::cuda::getCurrentCUDAStream()`` which is backed by a separate + thread-local variable (``tls_current_streams``). Without also setting + this variable, DeepEP runs on the wrong stream and its operations are + not captured in the CUDA graph. + + This guard synchronises both so that DeepEP sees the same stream as + Paddle. + """ + + def __init__(self, stream): + self.stream = stream + self._paddle_guard = None + self._prev_c10_stream = None + self._device_index = None + + def __enter__(self): + if self.stream is None: + return + # 1. Set Paddle's current stream + self._paddle_guard = paddle.device.stream_guard(self.stream) + self._paddle_guard.__enter__() + + # 2. Set c10::cuda's current stream to match Paddle's + try: + self._device_index = _get_cuda_device_index() + self._prev_c10_stream = _get_c10_current_stream(self._device_index) + raw_stream = _get_paddle_raw_stream(self._device_index) + if raw_stream is not None: + _set_c10_current_stream(raw_stream, self._device_index) + except Exception as e: + logger.warning(f"Failed to set c10 current stream: {e}") + + def __exit__(self, *args): + if self._paddle_guard is not None: + self._paddle_guard.__exit__(*args) + # Restore c10 stream to match Paddle's restored stream + if self._device_index is not None: + try: + raw_stream = _get_paddle_raw_stream(self._device_index) + if raw_stream is not None: + _set_c10_current_stream(raw_stream, self._device_index) + except Exception: + pass + + +def _clean_deepep_low_latency_buffer(): + """Clean DeepEP low-latency buffer before warmup/capture/replay. + + DeepEP's low-latency kernels require parts of the buffer to be + zero-initialized. After a normal-mode dispatch/combine run (or a + previous low-latency run) the buffer is "dirty". If the buffer is + not cleaned before the next low-latency dispatch, the kernel reads + stale metadata and can encounter illegal-instruction errors (CUDA + error 715). + + This is the same approach used by SGLang's + DeepEPCudaGraphRunnerAdapter, which calls + clean_low_latency_buffer() before every forward pass that uses + low-latency mode. + """ + try: + from fastdeploy.model_executor.layers.moe.ep import DeepEPBufferManager + + DeepEPBufferManager.clean_low_latency_buffer() + except Exception: + pass + + @dataclass class ConcreteSizeEntry: """Record the concrete information corresponding to the current shape(num_tokens)""" @@ -124,72 +319,59 @@ def __init__( self.max_num_seqs = fd_config.scheduler_config.max_num_seqs self.real_bsz_to_captured_size = fd_config.graph_opt_config.real_bsz_to_captured_size - # Expected decode capture sequence (descending), consistent with capture_model() iteration order. - # Used to validate that captures happen in the correct order. - self._decode_expected_sequence: list[int] = sorted(self.cudagraph_capture_sizes, reverse=True) - # Points to the next expected position in _decode_expected_sequence. - self._decode_capture_index: int = 0 + # Create a dedicated capture stream (same approach as SGLang). + # DeepEP's low_latency_dispatch internally creates cross-stream dependencies + # (communication stream <-> default/legacy stream). If CUDA graph capture + # happens on the default stream, these dependencies cause: + # "operation would make the legacy stream depend on a capturing blocking stream" + # By capturing on a separate non-default stream, the default stream is free + # and DeepEP can create the required dependencies without conflict. + self._capture_stream = paddle.device.Stream() if paddle.is_compiled_with_cuda() else None - def _validate_decode_capture_order(self, shape: int) -> None: - """Validate that decode CUDA graph captures happen in expected descending order. - - Raises RuntimeError immediately if the actual capture order deviates from - the order defined by cudagraph_capture_sizes (sorted descending). - """ - if current_platform.is_xpu(): - return - - if self._decode_capture_index >= len(self._decode_expected_sequence): - raise RuntimeError( - f"[CUDA GRAPH][ID:{id(self)}] Unexpected CUDA graph capture: shape={shape}. " - f"All {len(self._decode_expected_sequence)} expected captures have already completed. " - f"Expected sequence: {self._decode_expected_sequence}" - ) - expected = self._decode_expected_sequence[self._decode_capture_index] - if shape != expected: - raise RuntimeError( - f"[CUDA GRAPH][ID:{id(self)}] CUDA graph capture order mismatch at index " - f"{self._decode_capture_index}: expected shape={expected}, got shape={shape}. " - f"Full expected sequence: {self._decode_expected_sequence}" - ) - logger.debug( - f"[CUDA GRAPH][ID:{id(self)}] Capture order validated: shape={shape} matches " - f"expected sequence at index {self._decode_capture_index} " - f"(sequence: {self._decode_expected_sequence})" - ) - self._decode_capture_index += 1 - - def run_static_model(self, entry: ConcreteSizeEntry, is_decode: bool = False, **kwargs): + def run_static_model(self, entry: ConcreteSizeEntry, **kwargs): if not entry.captured: - if is_decode: - self._validate_decode_capture_order(entry.real_shape) - # Warmup the model - for n in range(entry.num_finished_warmup, self.warm_up_size): - entry.num_finished_warmup += 1 - entry.runnable(**kwargs) - logger.debug( - f"[CUDA GRAPH][ID:{id(self)}] Warm up for batch size {entry.real_shape}, " - f"finished ({n + 1}/{entry.num_finished_warmup}) times" - ) - - # Store input addresses for debug - input_addresses = [x.data_ptr() for (_, x) in kwargs.items() if isinstance(x, paddle.Tensor)] - entry.input_addresses = input_addresses - - # Capture - self.cuda_graph_manager.state = jit_utils.CUDAGraphState.CAPTURE - self.cuda_graph_manager.batch_size = entry.real_shape - entry.captured = True - with capture_custom_allreduce(): - with self.cuda_graph_manager.run_impl_guard(): + # Run warmup and capture on a dedicated non-default stream to avoid + # "legacy stream depends on capturing blocking stream" errors when + # DeepEP low_latency_dispatch creates cross-stream dependencies. + # Clean the DeepEP buffer before warmup to ensure low-latency + # kernels see a zero-initialized buffer. + _clean_deepep_low_latency_buffer() + with _DeepEPStreamGuard(self._capture_stream): + # Warmup the model + for n in range(entry.num_finished_warmup, self.warm_up_size): + entry.num_finished_warmup += 1 entry.runnable(**kwargs) - - # Replay + logger.debug( + f"[CUDA GRAPH][ID:{id(self)}] Warm up for batch size {entry.real_shape}, " + f"finished ({n + 1}/{entry.num_finished_warmup}) times" + ) + + # Store input addresses for debug + input_addresses = [x.data_ptr() for (_, x) in kwargs.items() if isinstance(x, paddle.Tensor)] + entry.input_addresses = input_addresses + + # Capture + self.cuda_graph_manager.state = jit_utils.CUDAGraphState.CAPTURE + self.cuda_graph_manager.batch_size = entry.real_shape + entry.captured = True + with capture_custom_allreduce(): + with self.cuda_graph_manager.run_impl_guard(): + entry.runnable(**kwargs) + + # Replay on the same capture stream self.cuda_graph_manager.state = jit_utils.CUDAGraphState.REPLAY self.cuda_graph_manager.batch_size = entry.real_shape - with self.cuda_graph_manager.run_impl_guard(): - return entry.runnable(**kwargs) + # NOTE: do NOT call _clean_deepep_low_latency_buffer() here. + # The captured graph already contains the clean_low_latency_buffer kernel + # from apply()'s is_moe_start_layer path. Adding an extra external clean + # would cause an nvshmemx_barrier_all_block() mismatch: worker 0 (replay) + # would hit 2 barriers while empty-input workers hit only 1. + with _DeepEPStreamGuard(self._capture_stream): + with self.cuda_graph_manager.run_impl_guard(): + result = entry.runnable(**kwargs) + paddle.device.synchronize() + return result def __call__(self, **kwargs) -> List[paddle.Tensor] | paddle.Tensor: # Get real shape (total num tokens) @@ -239,51 +421,72 @@ def __call__(self, **kwargs) -> List[paddle.Tensor] | paddle.Tensor: assert ( real_shape == padding_real_shape ), f"real_shape:{real_shape} is not equal to padding_real_shape:{padding_real_shape} when capture new graph." - self._validate_decode_capture_order(padding_real_shape) - # Warmup the model - for n in range(entry.num_finished_warmup, self.warm_up_size): - entry.num_finished_warmup += 1 - entry.runnable(**kwargs) - logger.info( - f"[CUDA GRAPH][ID:{id(self)}] Warm up for real shape {padding_real_shape}, " - f"finished ({n + 1}/{entry.num_finished_warmup}) times" - ) - - # Store input addresses for debug - input_addresses = [x.data_ptr() for (_, x) in kwargs.items() if isinstance(x, paddle.Tensor)] - entry.input_addresses = input_addresses - - new_grpah = graphs.CUDAGraph(pool_id=self.unique_memory_pool_id) - paddle.device.synchronize() - - # Capture - with capture_custom_allreduce(): - new_grpah.capture_begin() - outputs = entry.runnable(**kwargs) - if isinstance(outputs, paddle.Tensor): - assert outputs is not None - outputs = [outputs] - new_grpah.capture_end() - - # Store output buffer - entry.cuda_graph = new_grpah - for output in outputs: - if output is not None: - output_buffer = paddle.zeros_like(output) - output._share_buffer_to(output_buffer) - output._clear() - entry.output_buffers.append(output_buffer) - else: - entry.output_buffers.append(None) - paddle.device.synchronize() + # Run warmup and capture on a dedicated non-default stream to avoid + # "legacy stream depends on capturing blocking stream" errors when + # DeepEP low_latency_dispatch creates cross-stream dependencies. + # Clean the DeepEP buffer before warmup to ensure low-latency + # kernels see a zero-initialized buffer. + _clean_deepep_low_latency_buffer() + with _DeepEPStreamGuard(self._capture_stream): + # Warmup the model + for n in range(entry.num_finished_warmup, self.warm_up_size): + entry.num_finished_warmup += 1 + entry.runnable(**kwargs) + logger.info( + f"[CUDA GRAPH][ID:{id(self)}] Warm up for real shape {padding_real_shape}, " + f"finished ({n + 1}/{entry.num_finished_warmup}) times" + ) + + # Store input addresses for debug + input_addresses = [x.data_ptr() for (_, x) in kwargs.items() if isinstance(x, paddle.Tensor)] + entry.input_addresses = input_addresses + + new_grpah = graphs.CUDAGraph(pool_id=self.unique_memory_pool_id) + paddle.device.synchronize() + + # Capture + with capture_custom_allreduce(): + new_grpah.capture_begin() + outputs = entry.runnable(**kwargs) + if isinstance(outputs, paddle.Tensor): + assert outputs is not None + outputs = [outputs] + new_grpah.capture_end() + + # Store output buffer + entry.cuda_graph = new_grpah + for output in outputs: + if output is not None: + output_buffer = paddle.zeros_like(output) + output._share_buffer_to(output_buffer) + output._clear() + entry.output_buffers.append(output_buffer) + else: + entry.output_buffers.append(None) + + paddle.device.synchronize() # For CUDAGraph debug # self._save_cudagrpah_dot_files(entry) logger.info(f"[CUDA GRAPH][ID:{id(self)}] CUDAGraph captured for real shape {padding_real_shape}") - # Replay - entry.cuda_graph.replay() + # Replay on the same capture stream + # NOTE: CUDAGraph::Replay() uses the stream saved during capture (stream_) + # internally via cudaGraphLaunch(), so the graph is always launched on the + # capture stream regardless of the current stream. We use stream_guard here + # so that Paddle's internal stream state is consistent. + # After replay, we need to synchronize the capture stream with the default + # stream because the capture stream is non-blocking (kStreamNonBlocking) + # and does not implicitly synchronize with the default stream. + # NOTE: do NOT call _clean_deepep_low_latency_buffer() here. + # The captured graph already contains the clean_low_latency_buffer kernel + # from apply()'s is_moe_start_layer path. Adding an extra external clean + # would cause an nvshmemx_barrier_all_block() mismatch: worker 0 (replay) + # would hit 2 barriers while empty-input workers hit only 1. + with _DeepEPStreamGuard(self._capture_stream): + entry.cuda_graph.replay() + paddle.device.synchronize() logger.debug(f"[CUDA GRAPH][ID:{id(self)}] CUDAGraph replayed for real shape {padding_real_shape}") if len(entry.output_buffers) == 1: return entry.output_buffers[0] diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 33993872cb6..63b2968a882 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -79,6 +79,11 @@ def clear_buffer(cls): if cls._engine: cls._engine.clear_deep_ep_buffer() + @classmethod + def clean_low_latency_buffer(cls): + if cls._engine: + cls._engine.clean_low_latency_buffer() + @classmethod def recreate_buffer(cls): if cls._engine: diff --git a/fastdeploy/model_executor/utils.py b/fastdeploy/model_executor/utils.py index e63603047be..abe09a082ec 100644 --- a/fastdeploy/model_executor/utils.py +++ b/fastdeploy/model_executor/utils.py @@ -131,6 +131,9 @@ def slice_fn(weight_or_paramter, output_dim, start, end, step=1): def process_weight_transpose(layer, weight_name): weight = getattr(layer, weight_name) + if not weight._is_initialized(): + logger.info("权重没初始化啊!") + return if len(weight.shape) == 2: weight_shape = weight.shape[::-1] elif len(weight.shape) == 3: