Skip to content

Commit 234e828

Browse files
committed
Wrap raw value from API to fit XCom interface
1 parent e91a37f commit 234e828

1 file changed

Lines changed: 8 additions & 2 deletions

File tree

task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20+
import collections
2021
import itertools
2122
from collections.abc import Iterator, Sequence
2223
from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload
@@ -30,6 +31,11 @@
3031

3132
T = TypeVar("T")
3233

34+
# This is used to wrap values from the API so the structure is compatible with
35+
# ``XCom.deserialize_value``. We don't want to wrap the API values in a nested
36+
# {"value": value} dict since it wastes bandwidth.
37+
_XComWrapper = collections.namedtuple("_XComWrapper", "value")
38+
3339
log = structlog.get_logger(logger_name=__name__)
3440

3541

@@ -132,7 +138,7 @@ def __getitem__(self, key: int | slice) -> T | Sequence[T]:
132138
raise IndexError(key)
133139
if not isinstance(msg, XComSequenceIndexResponse):
134140
raise TypeError(f"Got unexpected response to GetXComSequenceItem: {msg}")
135-
return BaseXCom.deserialize_value(msg.root)
141+
return BaseXCom.deserialize_value(_XComWrapper(msg.root))
136142

137143
if isinstance(key, slice):
138144
start, stop, step = _coerce_slice(key)
@@ -153,7 +159,7 @@ def __getitem__(self, key: int | slice) -> T | Sequence[T]:
153159
msg = SUPERVISOR_COMMS.get_message()
154160
if not isinstance(msg, XComSequenceSliceResponse):
155161
raise TypeError(f"Got unexpected response to GetXComSequenceSlice: {msg}")
156-
return [BaseXCom.deserialize_value(value) for value in msg.root]
162+
return [BaseXCom.deserialize_value(_XComWrapper(value)) for value in msg.root]
157163

158164
raise TypeError(f"Sequence indices must be integers or slices, not {type(key).__name__}")
159165

0 commit comments

Comments
 (0)