Skip to content

Commit d9e5219

Browse files
authored
Adjustments to external storage defaults (#1404)
1 parent 9408022 commit d9e5219

8 files changed

Lines changed: 43 additions & 18 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ Some things to note about external storage:
503503
* Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
504504
* External storage applies transparently to all payloads, whether they are workflow inputs/outputs, activity inputs/outputs, signal inputs, query outputs, update inputs/outputs, or failure details.
505505
* The `DataConverter`'s `payload_codec` (if configured) is applied to the payload *before* it is handed to the storage driver, so the driver always stores encoded bytes. The reference payload written to workflow history is not encoded by the `DataConverter` codec.
506-
* Setting `ExternalStorage.payload_size_threshold` to `None` causes every payload to be considered for external storage regardless of size.
506+
* Setting `ExternalStorage.payload_size_threshold` to `0` causes every payload to be considered for external storage regardless of size.
507507

508508
###### Driver Selection
509509

temporalio/contrib/aws/s3driver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Payloads are stored under content-addressable keys derived from a SHA-256 hash o
6666
* Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them.
6767
* The target S3 bucket must already exist; the driver will not create it.
6868
* Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers.
69-
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `None` to offload every payload regardless of size.
69+
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `0` to offload every payload regardless of size.
7070
* `S3StorageDriver.max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB.
7171
* Override `S3StorageDriver.driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list.
7272

temporalio/converter/_extstore.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,9 @@ class ExternalStorage(WithSerializationContext):
210210
one driver is registered, that driver is used for all store operations.
211211
"""
212212

213-
payload_size_threshold: int | None = 256 * 1024
213+
payload_size_threshold: int = 256 * 1024
214214
"""Minimum payload size in bytes before external storage is considered.
215-
Defaults to 256 KiB. Set to ``None`` to consider every payload for
216-
external storage regardless of size.
215+
Defaults to 256 KiB. Must be greater than or equal to zero.
217216
"""
218217

219218
_driver_map: dict[str, StorageDriver] = dataclasses.field(
@@ -234,14 +233,19 @@ class ExternalStorage(WithSerializationContext):
234233
def __post_init__(self) -> None:
235234
"""Validate drivers and build the internal name-keyed driver map.
236235
237-
Raises :exc:`ValueError` if no drivers are provided, if more than one
236+
Raises :exc:`ValueError` if no drivers are provided, if
237+
:attr:`payload_size_threshold` is less than zero, if more than one
238238
driver is registered without a :attr:`driver_selector`, or if any two
239239
drivers share the same name.
240240
"""
241241
if not self.drivers:
242242
raise ValueError(
243243
"ExternalStorage.drivers must contain at least one driver."
244244
)
245+
if self.payload_size_threshold < 0:
246+
raise ValueError(
247+
"ExternalStorage.payload_size_threshold must be greater than or equal to zero."
248+
)
245249
if len(self.drivers) > 1 and self.driver_selector is None:
246250
raise ValueError(
247251
"ExternalStorage.driver_selector must be specified if multiple drivers are registered."
@@ -267,10 +271,7 @@ def _select_driver(
267271
self, context: StorageDriverStoreContext, payload: Payload
268272
) -> StorageDriver | None:
269273
"""Returns the driver to use for this payload, or None to pass through."""
270-
if (
271-
self.payload_size_threshold is not None
272-
and payload.ByteSize() < self.payload_size_threshold
273-
):
274+
if payload.ByteSize() < self.payload_size_threshold:
274275
return None
275276
selector = self.driver_selector
276277
if selector is None:

temporalio/worker/_worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,12 @@ def __init__(
321321
See https://docs.temporal.io/troubleshooting/blob-size-limit-error for more
322322
details.
323323
max_workflow_task_external_storage_concurrency: Maximum number of
324-
external storage I/O operations (store/retrieve) that may run
324+
external storage payload operations (store/retrieve) that may run
325325
concurrently within a single workflow task activation.
326-
Defaults to 10. WARNING: This setting is experimental.
326+
Defaults to 3. Adjust this value based on your workload's needs.
327+
Please report any issues you encounter with this setting or if you
328+
feel the default should be changed.
329+
WARNING: This setting is experimental.
327330
328331
"""
329332
config = WorkerConfig(

temporalio/worker/_workflow.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,13 @@
4747
# Set to true to log all activations and completions
4848
LOG_PROTOS = False
4949

50-
_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 10
50+
# Value was chosen abitrarily as a small number that allows some concurrency and prevents
51+
# large numbers of concurrent external storage operations causing resource contention.
52+
# This default limit is per workflow task activation and does not limit the total number
53+
# of concurrent external storage operations across all workflow task activations.
54+
# Advise customers to adjust based on their workload needs and to report issues with the
55+
# value if problems are encountered. This setting is experimental.
56+
_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 3
5157

5258

5359
class _WorkflowWorker: # type:ignore[reportUnusedClass]

tests/test_extstore.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ async def store(
283283
external_storage=ExternalStorage(
284284
drivers=drivers,
285285
driver_selector=lambda ctx, p: next(drivers_iter),
286-
payload_size_threshold=None,
286+
payload_size_threshold=0,
287287
)
288288
)
289289

@@ -337,7 +337,7 @@ async def retrieve(
337337
external_storage=ExternalStorage(
338338
drivers=drivers,
339339
driver_selector=lambda ctx, p: next(drivers_iter),
340-
payload_size_threshold=None,
340+
payload_size_threshold=0,
341341
)
342342
)
343343
encoded = await converter.encode(["payload_a", "payload_b"])
@@ -631,7 +631,7 @@ def selector(_ctx: object, payload: Payload) -> StorageDriver:
631631
external_storage=ExternalStorage(
632632
drivers=[driver_a, driver_b],
633633
driver_selector=selector,
634-
payload_size_threshold=None,
634+
payload_size_threshold=0,
635635
)
636636
)
637637

@@ -678,6 +678,21 @@ def test_duplicate_driver_names_raises(self):
678678
payload_size_threshold=50,
679679
)
680680

681+
@pytest.mark.parametrize("threshold", [-1, -1000])
682+
def test_negative_payload_size_threshold_raises(self, threshold: int):
683+
"""A negative payload_size_threshold raises ValueError immediately
684+
when constructing ExternalStorage."""
685+
driver = InMemoryTestDriver()
686+
687+
with pytest.raises(
688+
ValueError,
689+
match=r"^ExternalStorage\.payload_size_threshold must be greater than or equal to zero\.$",
690+
):
691+
ExternalStorage(
692+
drivers=[driver],
693+
payload_size_threshold=threshold,
694+
)
695+
681696

682697
if __name__ == "__main__":
683698
pytest.main([__file__, "-v"])

tests/test_serialization_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1972,7 +1972,7 @@ async def test_child_workflow_external_storage_with_context(client: Client):
19721972
DataConverter.default,
19731973
external_storage=ExternalStorage(
19741974
drivers=[driver],
1975-
payload_size_threshold=None,
1975+
payload_size_threshold=0,
19761976
),
19771977
)
19781978
client = Client(**config)

tests/worker/test_extstore.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ def __init__(self, driver_name: str):
581581
external_storage=ExternalStorage(
582582
drivers=[driver1, driver2, driver3],
583583
driver_selector=lambda _context, _payload: driver1,
584-
payload_size_threshold=None,
584+
payload_size_threshold=0,
585585
),
586586
),
587587
)

0 commit comments

Comments
 (0)