Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@
)
from ._executors_basic import (
BASIC_ACTION_EXECUTORS,
AppendValueExecutor,
ClearAllVariablesExecutor,
CreateConversationExecutor,
EmitEventExecutor,
ResetVariableExecutor,
SendActivityExecutor,
SetMultipleVariablesExecutor,
Expand All @@ -61,12 +59,10 @@
)
from ._executors_external_input import (
EXTERNAL_INPUT_EXECUTORS,
ConfirmationExecutor,
ExternalInputRequest,
ExternalInputResponse,
QuestionExecutor,
RequestExternalInputExecutor,
WaitForInputExecutor,
)
from ._executors_http import (
HTTP_ACTION_EXECUTORS,
Expand Down Expand Up @@ -122,11 +118,9 @@
"AgentExternalInputRequest",
"AgentExternalInputResponse",
"AgentResult",
"AppendValueExecutor",
"BaseToolExecutor",
"BreakLoopExecutor",
"ClearAllVariablesExecutor",
"ConfirmationExecutor",
"ContinueLoopExecutor",
"ConversationData",
"CreateConversationExecutor",
Expand All @@ -139,7 +133,6 @@
"DeclarativeWorkflowState",
"DefaultHttpRequestHandler",
"DefaultMCPToolHandler",
"EmitEventExecutor",
"EndConversationExecutor",
"EndWorkflowExecutor",
"ExternalInputRequest",
Expand Down Expand Up @@ -173,7 +166,6 @@
"ToolApprovalResponse",
"ToolApprovalState",
"ToolInvocationResult",
"WaitForInputExecutor",
"WorkflowFactory",
"WorkflowState",
]
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,9 @@ def __init__(self, result: Any = None):

@dataclass
class ConditionResult:
"""Result of evaluating a condition (If/Switch).
"""Result of evaluating a condition (If/ConditionGroup).

This message is output by ConditionEvaluatorExecutor and SwitchEvaluatorExecutor
This message is output by ConditionEvaluatorExecutor and ConditionGroupEvaluatorExecutor
to indicate which branch should be taken.
"""

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -179,28 +179,6 @@ async def handle_action(
await ctx.send_message(ActionComplete())


class AppendValueExecutor(DeclarativeActionExecutor):
"""Executor for the AppendValue action."""

@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Handle the AppendValue action."""
state = await self._ensure_state_initialized(ctx, trigger)

path = self._action_def.get("path")
value = self._action_def.get("value")

if path:
evaluated_value = state.eval_if_expression(value)
state.append(path, evaluated_value)

await ctx.send_message(ActionComplete())


class ResetVariableExecutor(DeclarativeActionExecutor):
"""Executor for the ResetVariable action."""

Expand Down Expand Up @@ -279,47 +257,6 @@ async def handle_action(
await ctx.send_message(ActionComplete())


class EmitEventExecutor(DeclarativeActionExecutor):
"""Executor for the EmitEvent action.

Emits a custom event to the workflow event stream.

Supports two schema formats:
1. Graph mode: eventName, eventValue
2. Interpreter mode: event.name, event.data
"""

@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete, dict[str, Any]],
) -> None:
"""Handle the EmitEvent action."""
state = await self._ensure_state_initialized(ctx, trigger)

# Support both schema formats:
# - Graph mode: eventName, eventValue
# - Interpreter mode: event.name, event.data
event_def = self._action_def.get("event", {})
event_name = self._action_def.get("eventName") or event_def.get("name", "")
event_value = self._action_def.get("eventValue")
if event_value is None:
event_value = event_def.get("data")

if event_name:
evaluated_name = state.eval_if_expression(event_name)
evaluated_value = state.eval_if_expression(event_value)

event_data = {
"eventName": evaluated_name,
"eventValue": evaluated_value,
}
await ctx.yield_output(event_data)

await ctx.send_message(ActionComplete())


class EditTableExecutor(DeclarativeActionExecutor):
"""Executor for the EditTable action.

Expand Down Expand Up @@ -628,11 +565,9 @@ def _convert_to_type(self, value: Any, target_type: str) -> Any:
"SetVariable": SetVariableExecutor,
"SetTextVariable": SetTextVariableExecutor,
"SetMultipleVariables": SetMultipleVariablesExecutor,
"AppendValue": AppendValueExecutor,
"ResetVariable": ResetVariableExecutor,
"ClearAllVariables": ClearAllVariablesExecutor,
"SendActivity": SendActivityExecutor,
"EmitEvent": EmitEventExecutor,
"ParseValue": ParseValueExecutor,
"EditTable": EditTableExecutor,
"EditTableV2": EditTableV2Executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""Control flow executors for the graph-based declarative workflow system.

Control flow in the graph-based system is handled differently than the interpreter:
- If/Switch: Condition evaluation happens in a dedicated evaluator executor that
- If/ConditionGroup: Condition evaluation happens in a dedicated evaluator executor that
returns a ConditionResult with the first-matching branch index. Edge conditions
then check the branch_index to route to the correct branch. This ensures only
one branch executes (first-match semantics), matching the interpreter behavior.
Expand Down Expand Up @@ -39,7 +39,7 @@


class ConditionGroupEvaluatorExecutor(DeclarativeActionExecutor):
"""Evaluates conditions for ConditionGroup/Switch and outputs the first-matching branch.
"""Evaluates conditions for ConditionGroup and outputs the first-matching branch.

This executor implements first-match semantics by evaluating conditions sequentially
and outputting a ConditionResult with the index of the first matching branch.
Expand All @@ -59,7 +59,7 @@ def __init__(
"""Initialize the condition evaluator.

Args:
action_def: The ConditionGroup/Switch action definition
action_def: The ConditionGroup action definition
conditions: List of condition items, each with 'condition' and optional 'id'
id: Optional executor ID
"""
Expand Down Expand Up @@ -99,71 +99,6 @@ async def handle_action(
await ctx.send_message(ConditionResult(matched=False, branch_index=ELSE_BRANCH_INDEX))


class SwitchEvaluatorExecutor(DeclarativeActionExecutor):
"""Evaluates a Switch action by matching a value against cases.

The Switch action uses a different schema than ConditionGroup:
- value: expression to evaluate once
- cases: list of {match: value_to_match, actions: [...]}
- default: default actions if no case matches

This evaluator evaluates the value expression once, then compares it
against each case's match value sequentially. First match wins.
"""

def __init__(
self,
action_def: dict[str, Any],
cases: list[dict[str, Any]],
*,
id: str | None = None,
):
"""Initialize the switch evaluator.

Args:
action_def: The Switch action definition (contains 'value' expression)
cases: List of case items, each with 'match' and optional 'actions'
id: Optional executor ID
"""
super().__init__(action_def, id=id)
self._cases = cases

@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ConditionResult],
) -> None:
"""Evaluate the switch value and find the first matching case."""
state = await self._ensure_state_initialized(ctx, trigger)

value_expr = self._action_def.get("value")
if not value_expr:
# No value to switch on - use default
await ctx.send_message(ConditionResult(matched=False, branch_index=ELSE_BRANCH_INDEX))
return

# Evaluate the switch value once
switch_value = state.eval_if_expression(value_expr)

# Compare against each case's match value
for index, case_item in enumerate(self._cases):
match_expr = case_item.get("match")
if match_expr is None:
continue

# Evaluate the match value
match_value = state.eval_if_expression(match_expr)

if switch_value == match_value:
# Found matching case
await ctx.send_message(ConditionResult(matched=True, branch_index=index, value=switch_value))
return

# No case matched - use default branch
await ctx.send_message(ConditionResult(matched=False, branch_index=ELSE_BRANCH_INDEX))


class IfConditionEvaluatorExecutor(DeclarativeActionExecutor):
"""Evaluates a single If condition and outputs a ConditionResult.

Expand Down Expand Up @@ -221,12 +156,7 @@ async def handle_action(
"""Initialize the loop and check for first item."""
state = await self._ensure_state_initialized(ctx, trigger)

# Support multiple schema formats:
# - Graph mode: itemsSource, items
# - Interpreter mode: source
items_expr = (
self._action_def.get("itemsSource") or self._action_def.get("items") or self._action_def.get("source")
)
items_expr = self._action_def.get("source")
items_raw: Any = state.eval_if_expression(items_expr) or []

items: list[Any]
Expand All @@ -244,25 +174,12 @@ async def handle_action(
}
state.set_state_data(state_data)

# Check if we have items
if items:
# Set the iteration variable
# Support multiple schema formats:
# - Graph mode: iteratorVariable, item (default "Local.item")
# - Interpreter mode: itemName (default "item", stored in Local scope)
item_var = self._action_def.get("iteratorVariable") or self._action_def.get("item")
if not item_var:
# Interpreter mode: itemName defaults to "item", store in Local scope
item_name = self._action_def.get("itemName", "item")
item_var = f"Local.{item_name}"

# Support multiple schema formats for index:
# - Graph mode: indexVariable, index
# - Interpreter mode: indexName (default "index", stored in Local scope)
index_var = self._action_def.get("indexVariable") or self._action_def.get("index")
if not index_var and "indexName" in self._action_def:
index_name = self._action_def.get("indexName", "index")
index_var = f"Local.{index_name}"
# Bind the current item and (when requested) the index under the Local scope.
item_var = f"Local.{self._action_def.get('itemName', 'item')}"
index_var = (
f"Local.{self._action_def.get('indexName', 'index')}" if "indexName" in self._action_def else None
)

state.set(item_var, items[0])
if index_var:
Expand Down Expand Up @@ -325,23 +242,11 @@ async def handle_action(
loop_state["index"] = current_index
state.set_state_data(state_data)

# Set the iteration variable
# Support multiple schema formats:
# - Graph mode: iteratorVariable, item (default "Local.item")
# - Interpreter mode: itemName (default "item", stored in Local scope)
item_var = self._action_def.get("iteratorVariable") or self._action_def.get("item")
if not item_var:
# Interpreter mode: itemName defaults to "item", store in Local scope
item_name = self._action_def.get("itemName", "item")
item_var = f"Local.{item_name}"

# Support multiple schema formats for index:
# - Graph mode: indexVariable, index
# - Interpreter mode: indexName (default "index", stored in Local scope)
index_var = self._action_def.get("indexVariable") or self._action_def.get("index")
if not index_var and "indexName" in self._action_def:
index_name = self._action_def.get("indexName", "index")
index_var = f"Local.{index_name}"
# Rebind the current item and (when requested) the index under the Local scope.
item_var = f"Local.{self._action_def.get('itemName', 'item')}"
index_var = (
f"Local.{self._action_def.get('indexName', 'index')}" if "indexName" in self._action_def else None
)

state.set(item_var, items[current_index])
if index_var:
Expand Down Expand Up @@ -486,7 +391,7 @@ async def handle_action(
class JoinExecutor(DeclarativeActionExecutor):
"""Executor that joins multiple branches back together.

Used after If/Switch to merge control flow back to a single path.
Used after If/ConditionGroup to merge control flow back to a single path.
Also used as passthrough nodes for else/default branches.
"""

Expand Down
Loading
Loading