-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathconftest.py
More file actions
200 lines (159 loc) · 6.79 KB
/
conftest.py
File metadata and controls
200 lines (159 loc) · 6.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import asyncio
import multiprocessing.context
import os
import sys
from collections.abc import AsyncGenerator, Iterator
import pytest
import pytest_asyncio
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import SharedStateManager
from tests.helpers.worker import ExternalPythonWorker, ExternalWorker
from . import DEV_SERVER_DOWNLOAD_VERSION
# If there is an integration test environment variable set, we must remove the
# first path from the sys.path so we can import the wheel instead
if os.getenv("TEMPORAL_INTEGRATION_TEST"):
assert (
sys.path[0] == os.getcwd()
), "Expected first sys.path to be the current working dir"
sys.path.pop(0)
# Import temporalio and confirm it is prefixed with virtual env
import temporalio
assert temporalio.__file__.startswith(
sys.prefix
), f"Expected {temporalio.__file__} to be in {sys.prefix}"
# Unless specifically overridden, we expect tests to run under protobuf 4.x/5.x lib
import google.protobuf
protobuf_version = google.protobuf.__version__
if os.getenv("TEMPORAL_TEST_PROTO3"):
assert protobuf_version.startswith(
"3."
), f"Expected protobuf 3.x, got {protobuf_version}"
else:
assert (
protobuf_version.startswith("4.")
or protobuf_version.startswith("5.")
or protobuf_version.startswith("6.")
), f"Expected protobuf 4.x/5.x/6.x, got {protobuf_version}"
def pytest_runtest_setup(item): # type: ignore[reportMissingParameterType]
"""Print a newline so that custom printed output starts on new line."""
if item.config.getoption("-s"):
print()
def pytest_addoption(parser): # type: ignore[reportMissingParameterType]
parser.addoption(
"-E",
"--workflow-environment",
default="local",
help="Which workflow environment to use ('local', 'time-skipping', or ip:port for existing server)",
)
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop() # type: ignore[reportDeprecated]
yield loop
try:
loop.close()
except TypeError:
raise
class NoEventLoopPolicy(asyncio.AbstractEventLoopPolicy): # type: ignore[name-defined]
def __init__(self, underlying: asyncio.AbstractEventLoopPolicy): # type: ignore[name-defined]
super().__init__()
self._underlying = underlying
def get_event_loop(self):
return self._underlying.get_event_loop()
def set_event_loop(self, loop): # type: ignore[reportMissingParameterType]
return self._underlying.set_event_loop(loop)
def new_event_loop(self): # type: ignore[reportIncompatibleMethodOverride]
return None
def get_child_watcher(self):
return self._underlying.get_child_watcher() # type: ignore[reportDeprecated]
def set_child_watcher(self, watcher): # type: ignore[reportMissingParameterType]
return self._underlying.set_child_watcher(watcher) # type: ignore[reportDeprecated]
@pytest.fixture(scope="session")
def env_type(request: pytest.FixtureRequest) -> str:
return request.config.getoption("--workflow-environment") # type: ignore[reportReturnType]
@pytest_asyncio.fixture(scope="session") # type: ignore[reportUntypedFunctionDecorator]
async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
if env_type == "local":
env = await WorkflowEnvironment.start_local(
dev_server_extra_args=[
"--dynamic-config-value",
"system.forceSearchAttributesCacheRefreshOnRead=true",
"--dynamic-config-value",
f"limit.historyCount.suggestContinueAsNew={CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT}",
"--dynamic-config-value",
"system.enableEagerWorkflowStart=true",
"--dynamic-config-value",
"frontend.enableExecuteMultiOperation=true",
"--dynamic-config-value",
"frontend.workerVersioningWorkflowAPIs=true",
"--dynamic-config-value",
"frontend.workerVersioningDataAPIs=true",
"--dynamic-config-value",
"system.enableDeploymentVersions=true",
"--dynamic-config-value",
"frontend.activityAPIsEnabled=true",
"--dynamic-config-value",
"component.nexusoperations.recordCancelRequestCompletionEvents=true",
"--dynamic-config-value",
"activity.enableStandalone=true",
"--dynamic-config-value",
"history.enableChasm=true",
"--dynamic-config-value",
"history.enableTransitionHistory=true",
"--dynamic-config-value",
"frontend.enableCancelWorkerPollsOnShutdown=true",
],
dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION,
)
elif env_type == "time-skipping":
env = await WorkflowEnvironment.start_time_skipping()
else:
env = WorkflowEnvironment.from_client(await Client.connect(env_type))
yield env
await env.shutdown()
@pytest.fixture(scope="session")
def shared_state_manager() -> Iterator[SharedStateManager]:
mp_mgr = multiprocessing.Manager()
mgr = SharedStateManager.create_from_multiprocessing(mp_mgr)
try:
yield mgr
finally:
mp_mgr.shutdown()
@pytest.fixture(scope="session")
def mp_fork_ctx() -> Iterator[multiprocessing.context.BaseContext | None]:
mp_ctx = None
try:
mp_ctx = multiprocessing.get_context("fork")
except ValueError:
pass
try:
yield mp_ctx
finally:
if mp_ctx:
for p in mp_ctx.active_children():
p.terminate()
p.join()
@pytest_asyncio.fixture # type: ignore[reportUntypedFunctionDecorator]
async def client(env: WorkflowEnvironment) -> Client:
return env.client
@pytest_asyncio.fixture(scope="session") # type: ignore[reportUntypedFunctionDecorator]
async def worker(
env: WorkflowEnvironment,
) -> AsyncGenerator[ExternalWorker, None]:
worker = ExternalPythonWorker(env)
yield worker
await worker.close()
# There is an issue in tests sometimes in GitHub actions where even though all tests
# pass, an unclear outer area is killing the process with a bad exit code. This
# hook forcefully kills the process as success when the exit code from pytest
# is a success.
@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_cmdline_main(config): # type: ignore[reportMissingParameterType, reportUnusedParameter]
result = yield
if result.get_result() == 0:
os._exit(0)
return result.get_result()
CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT = 50
@pytest.fixture
def continue_as_new_suggest_history_count() -> int:
return CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT