Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
04a1c6d
Payload limit configuration and validation
jmaeagle99 Jan 17, 2026
9ca018d
Change error limits to disabling fields
jmaeagle99 Jan 28, 2026
fb7f857
Use error limits from server
jmaeagle99 Jan 28, 2026
98b1eb2
Remove tests that do not test real code paths
jmaeagle99 Jan 28, 2026
861269b
Don't pass error limits to workflow replayer
jmaeagle99 Jan 28, 2026
526f595
Fix formatting
jmaeagle99 Jan 28, 2026
dc38edd
Use consistent message and use marker values for improved searchability
jmaeagle99 Jan 28, 2026
a9aa5ce
Handle failure encoding payload limit errors for activities
jmaeagle99 Jan 28, 2026
7f621ef
Nest workflow payload error handle in case it causes payload limit er…
jmaeagle99 Jan 28, 2026
58081db
Remove remaining investigative comments
jmaeagle99 Jan 28, 2026
22de18b
Better messages and use TMPRL rule code
jmaeagle99 Jan 28, 2026
dd7bcc2
Merge branch 'main' into payload-limits
jmaeagle99 Jan 28, 2026
76ff2a9
More test comments
jmaeagle99 Jan 28, 2026
2efbb2c
Static method and custom warning category
jmaeagle99 Jan 29, 2026
3036a8a
Rename options and move disablement to worker
jmaeagle99 Jan 29, 2026
5c2c5e7
Merge branch 'main' into payload-limits
jmaeagle99 Feb 3, 2026
97ffc55
Remove mock error limits
jmaeagle99 Feb 3, 2026
8caeb44
Clarify error limits from server and support None
jmaeagle99 Feb 3, 2026
657c188
Use memo.fields.values()
jmaeagle99 Feb 3, 2026
73b670b
Skip error tests on time-skipping server
jmaeagle99 Feb 3, 2026
30df033
Adjust test data types, sizes, and comments
jmaeagle99 Feb 3, 2026
ec3c79e
Disable activity exception test
jmaeagle99 Feb 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ async def encode_completion(
encode_headers: bool,
) -> None:
"""Encode all payloads in the completion."""
if data_converter._encode_payload_has_effect:
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not encode_headers
).visit(_Visitor(data_converter._encode_payload_sequence), completion)
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not encode_headers
).visit(_Visitor(data_converter._encode_payload_sequence), completion)
2 changes: 1 addition & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6837,7 +6837,7 @@ async def _apply_headers(
) -> None:
if source is None:
return
if encode_headers and data_converter._encode_payload_has_effect:
if encode_headers:
for payload in source.values():
payload.CopyFrom(await data_converter._encode_payload(payload))
temporalio.common._apply_headers(source, dest)
Expand Down
99 changes: 90 additions & 9 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,32 @@ def __init__(self) -> None:
super().__init__(encode_common_attributes=True)


@dataclass(frozen=True)
class PayloadLimitsConfig:
"""Configuration for when payload sizes exceed limits."""

memo_size_warning: int = 2 * 1024
"""The limit (in bytes) at which a memo size warning is logged."""

payload_size_warning: int = 512 * 1024
"""The limit (in bytes) at which a payload size warning is logged."""


class PayloadSizeWarning(RuntimeWarning):
"""The size of payloads is above the warning limit."""


@dataclass(frozen=True)
class _ServerPayloadErrorLimits:
"""Error limits for payloads as described by the Temporal server."""

memo_size_error: int
"""The limit (in bytes) at which a memo size error is raised."""

payload_size_error: int
"""The limit (in bytes) at which a payload size error is raised."""


@dataclass(frozen=True)
class DataConverter(WithSerializationContext):
"""Data converter for converting and encoding payloads to/from Python values.
Expand All @@ -1230,9 +1256,15 @@ class DataConverter(WithSerializationContext):
failure_converter: FailureConverter = dataclasses.field(init=False)
"""Failure converter created from the :py:attr:`failure_converter_class`."""

payload_limits: PayloadLimitsConfig = PayloadLimitsConfig()
"""Settings for payload size limits."""

default: ClassVar[DataConverter]
"""Singleton default data converter."""

_payload_error_limits: _ServerPayloadErrorLimits | None = None
"""Server-reported limits for payloads."""

def __post_init__(self) -> None: # noqa: D105
object.__setattr__(self, "payload_converter", self.payload_converter_class())
object.__setattr__(self, "failure_converter", self.failure_converter_class())
Expand Down Expand Up @@ -1334,6 +1366,11 @@ def with_context(self, context: SerializationContext) -> Self:
object.__setattr__(cloned, "failure_converter", failure_converter)
return cloned

def _with_payload_error_limits(
self, limits: _ServerPayloadErrorLimits | None
) -> DataConverter:
return dataclasses.replace(self, _payload_error_limits=limits)

async def _decode_memo(
self,
source: temporalio.api.common.v1.Memo,
Expand Down Expand Up @@ -1372,30 +1409,38 @@ async def _encode_memo_existing(
if not isinstance(v, temporalio.api.common.v1.Payload):
payload = (await self.encode([v]))[0]
memo.fields[k].CopyFrom(payload)
# Memos have their field payloads validated all together in one unit
DataConverter._validate_limits(
list(memo.fields.values()),
self._payload_error_limits.memo_size_error
if self._payload_error_limits
else None,
"[TMPRL1103] Attempted to upload memo with size that exceeded the error limit.",
self.payload_limits.memo_size_warning,
"[TMPRL1103] Attempted to upload memo with size that exceeded the warning limit.",
)

async def _encode_payload(
self, payload: temporalio.api.common.v1.Payload
) -> temporalio.api.common.v1.Payload:
if self.payload_codec:
payload = (await self.payload_codec.encode([payload]))[0]
self._validate_payload_limits([payload])
return payload

async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
if self.payload_codec:
await self.payload_codec.encode_wrapper(payloads)
self._validate_payload_limits(payloads.payloads)

async def _encode_payload_sequence(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> list[temporalio.api.common.v1.Payload]:
if not self.payload_codec:
return list(payloads)
return await self.payload_codec.encode(payloads)

# Temporary shortcircuit detection while the _encode_* methods may no-op if
# a payload codec is not configured. Remove once those paths have more to them.
@property
def _encode_payload_has_effect(self) -> bool:
return self.payload_codec is not None
encoded_payloads = list(payloads)
if self.payload_codec:
encoded_payloads = await self.payload_codec.encode(encoded_payloads)
self._validate_payload_limits(encoded_payloads)
return encoded_payloads

async def _decode_payload(
self, payload: temporalio.api.common.v1.Payload
Expand Down Expand Up @@ -1452,6 +1497,42 @@ async def _apply_to_failure_payloads(
if failure.HasField("cause"):
await DataConverter._apply_to_failure_payloads(failure.cause, cb)

def _validate_payload_limits(
self,
payloads: Sequence[temporalio.api.common.v1.Payload],
):
DataConverter._validate_limits(
payloads,
self._payload_error_limits.payload_size_error
if self._payload_error_limits
else None,
"[TMPRL1103] Attempted to upload payloads with size that exceeded the error limit.",
self.payload_limits.payload_size_warning,
"[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit.",
)

@staticmethod
def _validate_limits(
payloads: Sequence[temporalio.api.common.v1.Payload],
error_limit: int | None,
error_message: str,
warning_limit: int,
warning_message: str,
):
total_size = sum(payload.ByteSize() for payload in payloads)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically if the server was comparing sizes of the Payloads message wrapper it is 2-3 bytes larger, but that is probably negligible, though still technically backwards incompatible if this sum is like 3.99999 MB heh.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably in the totality of the Payloads object, the size different is nearly negligible, but there are other fields on Payloads that would contribute to the overall size. If contributing non-zero, that means the SDK will pass on potentially too large payloads and the server would reject them, which is the behavior today.

It may be possible to update the visitor in Python to consider allowing the visitor functions to optionally take a Payloads. The visitor functions could have a default implementation that breaks it up into a payload sequence like it does today, but then for this feature it actually provides the override behavior.

Copy link
Member

@cretz cretz Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other fields on Payloads that would contribute to the overall size.

Are there? I am not aware of any, or do you mean in the future? The .ByteSize() invocation is for the whole Payload object, metadata and all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at https://github.com/temporalio/api-go/blob/c246540cf2eda8f4f8aa9e46ef33d87c48632a27/common/v1/message.pb.go#L81, which is what the server gets the size of e.g. https://github.com/temporalio/temporal/blob/8aeb4bda8dbae17648fb32205cf0b9af62fd07e5/service/frontend/workflow_handler.go#L1236. Maybe proto.Size() knows to ignore the extra fields since they shouldn't contribute to serialized size. So probably not a concern.

Copy link
Member

@cretz cretz Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that Payloads wrapper is the 2-3 bytes overhead I was talking about, but it only has the one proto field in it (ignore Go fields). There are no extra fields in Payloads, there is only one. We don't support unknown fields, though I guess users could try to hack that.


if error_limit and error_limit > 0 and total_size > error_limit:
raise temporalio.exceptions.PayloadSizeError(
f"{error_message} Size: {total_size} bytes, Limit: {error_limit} bytes"
)

if warning_limit > 0 and total_size > warning_limit:
# TODO: Use a context aware logger to log extra information about workflow/activity/etc
warnings.warn(
f"{warning_message} Size: {total_size} bytes, Limit: {warning_limit} bytes",
PayloadSizeWarning,
)


DefaultPayloadConverter.default_encoding_payload_converters = (
BinaryNullPayloadConverter(),
Expand Down
14 changes: 14 additions & 0 deletions temporalio/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,17 @@ def is_cancelled_exception(exception: BaseException) -> bool:
and isinstance(exception.cause, CancelledError)
)
)


class PayloadSizeError(TemporalError):
"""Error raised when payloads size exceeds payload size limits."""

def __init__(self, message: str):
"""Initialize a payloads size error."""
super().__init__(message)
self._message = message

@property
def message(self) -> str:
"""Message."""
return self._message
Loading
Loading