Skip to content

Commit f77a1b6

Browse files
committed
fix test issues
1 parent 6e13e3d commit f77a1b6

2 files changed

Lines changed: 333 additions & 334 deletions

File tree

tests/layers/test_trtllm_allreduce_rms_fusion.py

Lines changed: 2 additions & 333 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
import os
1818
import subprocess
1919
import sys
20-
import unittest
21-
from unittest.mock import Mock, patch
2220

23-
import paddle
2421

2522

2623
def test_run_distributed():
@@ -47,337 +44,9 @@ def test_run_distributed():
4744
process.kill()
4845
stdout, stderr = process.communicate()
4946
return_code = -1
47+
print(f"=== Distributed test stdout ===\n{stdout}")
48+
print(f"=== Distributed test stderr ===\n{stderr}")
5049
assert return_code in (0, 250), f"Process exited with code {return_code}"
5150

5251

5352
test_run_distributed()
54-
55-
56-
class TestFlashInferWorkspaceManagerEdgeCases(unittest.TestCase):
57-
"""Test FlashInferWorkspaceManager edge cases and fallback paths"""
58-
59-
def setUp(self):
60-
"""Initialize test fixtures"""
61-
# Patch before importing to test fallback paths
62-
self.patcher_has_flashinfer = patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion.has_flashinfer")
63-
self.mock_has_flashinfer = self.patcher_has_flashinfer.start()
64-
65-
def tearDown(self):
66-
"""Clean up patches"""
67-
self.patcher_has_flashinfer.stop()
68-
69-
def test_initialization_early_return_when_already_initialized(self):
70-
"""Test line 47: early return when already initialized with same world_size"""
71-
# Patch _flashinfer_comm to be available
72-
with patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm") as mock_comm:
73-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
74-
FlashInferWorkspaceManager,
75-
)
76-
77-
manager = FlashInferWorkspaceManager()
78-
79-
# First initialization
80-
manager.initialized = True
81-
manager.world_size = 2
82-
83-
# Mock the comm functions
84-
mock_comm.trtllm_create_ipc_workspace_for_all_reduce_fusion = Mock(return_value=(Mock(), Mock()))
85-
86-
# Second initialization with same world_size - should return early
87-
manager.initialize(
88-
world_size=2,
89-
rank=0,
90-
max_token_num=2048,
91-
hidden_dim=4096,
92-
)
93-
94-
def test_initialization_warning_when_comm_none(self):
95-
"""Test lines 50-51: warning when _flashinfer_comm is None"""
96-
# Patch to ensure _flashinfer_comm is None
97-
with patch(
98-
"fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm",
99-
None,
100-
):
101-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
102-
FlashInferWorkspaceManager,
103-
)
104-
105-
manager = FlashInferWorkspaceManager()
106-
107-
# Should not raise, just log warning and return
108-
manager.initialize(
109-
world_size=2,
110-
rank=0,
111-
max_token_num=2048,
112-
hidden_dim=4096,
113-
)
114-
115-
# Verify not initialized
116-
self.assertFalse(manager.initialized)
117-
118-
def test_cleanup_with_exception(self):
119-
"""Test lines 73-80: cleanup with exception handling"""
120-
with patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm") as mock_comm:
121-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
122-
FlashInferWorkspaceManager,
123-
)
124-
125-
manager = FlashInferWorkspaceManager()
126-
manager.initialized = True
127-
manager.ipc_handles = Mock()
128-
manager.workspace_tensor = Mock()
129-
130-
# Mock the destroy function to raise exception
131-
mock_comm.trtllm_destroy_ipc_workspace_for_all_reduce = Mock(side_effect=RuntimeError("Cleanup error"))
132-
133-
# Should not raise, just log warning
134-
manager.cleanup()
135-
136-
# Verify cleanup happened
137-
self.assertFalse(manager.initialized)
138-
self.assertIsNone(manager.workspace_tensor)
139-
self.assertIsNone(manager.ipc_handles)
140-
141-
def test_cleanup_without_initialization(self):
142-
"""Test cleanup when not initialized"""
143-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
144-
FlashInferWorkspaceManager,
145-
)
146-
147-
manager = FlashInferWorkspaceManager()
148-
manager.initialized = False
149-
150-
# Should not raise
151-
manager.cleanup()
152-
153-
# Verify state
154-
self.assertFalse(manager.initialized)
155-
156-
157-
class TestEnsureWorkspaceInitialized(unittest.TestCase):
158-
"""Test ensure_workspace_initialized fallback paths"""
159-
160-
def setUp(self):
161-
"""Initialize test fixtures"""
162-
self.patcher_has_flashinfer = patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion.has_flashinfer")
163-
self.mock_has_flashinfer = self.patcher_has_flashinfer.start()
164-
165-
def tearDown(self):
166-
"""Clean up patches"""
167-
self.patcher_has_flashinfer.stop()
168-
169-
def test_ensure_workspace_when_flashinfer_not_available(self):
170-
"""Test line 91: early return when flashinfer not available"""
171-
self.mock_has_flashinfer.return_value = False
172-
173-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
174-
ensure_workspace_initialized,
175-
)
176-
177-
fd_config = Mock()
178-
fd_config.parallel_config = Mock()
179-
fd_config.parallel_config.tensor_parallel_size = 2
180-
181-
result = ensure_workspace_initialized(fd_config)
182-
183-
# Should return False (not initialized)
184-
self.assertFalse(result)
185-
186-
def test_ensure_workspace_when_comm_none(self):
187-
"""Test ensure_workspace_initialized when _flashinfer_comm is None"""
188-
self.mock_has_flashinfer.return_value = True
189-
190-
with patch(
191-
"fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm",
192-
None,
193-
):
194-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
195-
ensure_workspace_initialized,
196-
)
197-
198-
fd_config = Mock()
199-
fd_config.parallel_config = Mock()
200-
fd_config.parallel_config.tensor_parallel_size = 2
201-
202-
result = ensure_workspace_initialized(fd_config)
203-
204-
# Should return False
205-
self.assertFalse(result)
206-
207-
def test_ensure_workspace_single_gpu(self):
208-
"""Test line 96: early return when world_size <= 1"""
209-
self.mock_has_flashinfer.return_value = True
210-
211-
with patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm"):
212-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
213-
ensure_workspace_initialized,
214-
)
215-
216-
fd_config = Mock()
217-
fd_config.parallel_config = Mock()
218-
fd_config.parallel_config.tensor_parallel_size = 1
219-
220-
with patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion.dist.get_rank", return_value=0):
221-
result = ensure_workspace_initialized(fd_config)
222-
223-
# Should return False for single GPU
224-
self.assertFalse(result)
225-
226-
227-
class TestFlashInferAllReduceResidualRMSNormFallbacks(unittest.TestCase):
228-
"""Test flashinfer_allreduce_residual_rmsnorm fallback paths"""
229-
230-
def setUp(self):
231-
"""Initialize test fixtures"""
232-
self.patcher_has_flashinfer = patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion.has_flashinfer")
233-
self.mock_has_flashinfer = self.patcher_has_flashinfer.start()
234-
235-
def tearDown(self):
236-
"""Clean up patches"""
237-
self.patcher_has_flashinfer.stop()
238-
239-
def test_flashinfer_not_available_fallback(self):
240-
"""Test lines 140-141: fallback when flashinfer not available"""
241-
self.mock_has_flashinfer.return_value = False
242-
243-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
244-
flashinfer_allreduce_residual_rmsnorm,
245-
)
246-
247-
fd_config = Mock()
248-
fd_config.parallel_config = Mock()
249-
fd_config.parallel_config.tensor_parallel_size = 2
250-
251-
input_tensor = paddle.randn([128, 768])
252-
residual = paddle.randn([128, 768])
253-
weight = paddle.randn([768])
254-
255-
norm_out, residual_out = flashinfer_allreduce_residual_rmsnorm(
256-
fd_config=fd_config,
257-
input_tensor=input_tensor,
258-
residual=residual,
259-
weight=weight,
260-
eps=1e-6,
261-
max_token_num=2048,
262-
)
263-
264-
# Should return None, None when flashinfer not available
265-
self.assertIsNone(norm_out)
266-
self.assertIsNone(residual_out)
267-
268-
def test_single_gpu_fallback(self):
269-
"""Test lines 146-147: fallback for single GPU"""
270-
self.mock_has_flashinfer.return_value = True
271-
272-
with patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm"):
273-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
274-
flashinfer_allreduce_residual_rmsnorm,
275-
)
276-
277-
fd_config = Mock()
278-
fd_config.parallel_config = Mock()
279-
fd_config.parallel_config.tensor_parallel_size = 1
280-
281-
input_tensor = paddle.randn([128, 768])
282-
residual = paddle.randn([128, 768])
283-
weight = paddle.randn([768])
284-
285-
norm_out, residual_out = flashinfer_allreduce_residual_rmsnorm(
286-
fd_config=fd_config,
287-
input_tensor=input_tensor,
288-
residual=residual,
289-
weight=weight,
290-
eps=1e-6,
291-
max_token_num=2048,
292-
)
293-
294-
# Should return None, None for single GPU
295-
self.assertIsNone(norm_out)
296-
self.assertIsNone(residual_out)
297-
298-
def test_empty_tensor_handling(self):
299-
"""Test line 166: empty tensor handling"""
300-
self.mock_has_flashinfer.return_value = True
301-
302-
with (
303-
patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion._flashinfer_comm") as mock_comm,
304-
patch(
305-
"fastdeploy.model_executor.layers.flashinfer_comm_fusion.ensure_workspace_initialized",
306-
return_value=True,
307-
),
308-
):
309-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
310-
flashinfer_allreduce_residual_rmsnorm,
311-
)
312-
313-
fd_config = Mock()
314-
fd_config.parallel_config = Mock()
315-
fd_config.parallel_config.tensor_parallel_size = 2
316-
317-
# Empty tensor (0 tokens)
318-
input_tensor = paddle.zeros([0, 768])
319-
residual = paddle.zeros([0, 768])
320-
weight = paddle.randn([768])
321-
322-
# Mock the trtllm_allreduce_fusion to not be called
323-
mock_comm.trtllm_allreduce_fusion = Mock()
324-
325-
norm_out, residual_out = flashinfer_allreduce_residual_rmsnorm(
326-
fd_config=fd_config,
327-
input_tensor=input_tensor,
328-
residual=residual,
329-
weight=weight,
330-
eps=1e-6,
331-
max_token_num=2048,
332-
)
333-
334-
# Should return empty tensors, not call flashinfer
335-
self.assertEqual(norm_out.shape[0], 0)
336-
self.assertEqual(residual_out.shape[0], 0)
337-
mock_comm.trtllm_allreduce_fusion.assert_not_called()
338-
339-
340-
class TestFakeFlashInferFunction(unittest.TestCase):
341-
"""Test fake_flashinfer_allreduce_residual_rmsnorm function"""
342-
343-
def test_fake_function_basic(self):
344-
"""Test lines 204-206: fake function basic functionality"""
345-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
346-
fake_flashinfer_allreduce_residual_rmsnorm,
347-
)
348-
349-
input_tensor = paddle.randn([128, 768])
350-
residual = paddle.randn([128, 768])
351-
weight = paddle.randn([768])
352-
353-
norm_out, residual_out = fake_flashinfer_allreduce_residual_rmsnorm(
354-
input_tensor=input_tensor,
355-
residual=residual,
356-
weight=weight,
357-
eps=1e-6,
358-
max_token_num=16384,
359-
use_oneshot=None,
360-
trigger_completion_at_end=False,
361-
fp32_acc=False,
362-
)
363-
364-
# Should return empty-like tensors
365-
self.assertEqual(norm_out.shape, input_tensor.shape)
366-
self.assertEqual(residual_out.shape, residual.shape)
367-
368-
369-
class TestCleanupFlashInferWorkspace(unittest.TestCase):
370-
"""Test cleanup_flashinfer_workspace function"""
371-
372-
def test_cleanup_workspace_function(self):
373-
"""Test lines 211-212: cleanup function"""
374-
with patch("fastdeploy.model_executor.layers.flashinfer_comm_fusion._workspace_manager") as mock_manager:
375-
from fastdeploy.model_executor.layers.flashinfer_comm_fusion import (
376-
cleanup_flashinfer_workspace,
377-
)
378-
379-
mock_manager.cleanup = Mock()
380-
381-
cleanup_flashinfer_workspace()
382-
383-
mock_manager.cleanup.assert_called_once()

0 commit comments

Comments
 (0)