Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import importlib
import json
import logging
from collections.abc import Callable
from collections.abc import Callable, Sequence
from inspect import isawaitable
from queue import Queue
from typing import Any
Expand Down Expand Up @@ -38,7 +37,11 @@
from semantic_kernel.processes.process_message import ProcessMessage
from semantic_kernel.processes.process_message_factory import ProcessMessageFactory
from semantic_kernel.processes.process_types import get_generic_state_type
from semantic_kernel.processes.step_utils import find_input_channels, get_fully_qualified_name
from semantic_kernel.processes.step_utils import (
find_input_channels,
get_fully_qualified_name,
get_step_class_from_qualified_name,
)
from semantic_kernel.utils.feature_stage_decorator import experimental

logger: logging.Logger = logging.getLogger(__name__)
Expand All @@ -48,18 +51,29 @@
class StepActor(Actor, StepInterface, KernelProcessMessageChannel):
"""Represents a step actor that follows the Step abstract class."""

def __init__(self, ctx: ActorRuntimeContext, actor_id: ActorId, kernel: Kernel, factories: dict[str, Callable]):
def __init__(
self,
ctx: ActorRuntimeContext,
actor_id: ActorId,
kernel: Kernel,
factories: dict[str, Callable],
allowed_module_prefixes: Sequence[str] | None = None,
):
"""Initializes a new instance of StepActor.

Args:
ctx: The actor runtime context.
actor_id: The unique ID for the actor.
kernel: The Kernel dependency to be injected.
factories: The factory dictionary to use for creating the step.
allowed_module_prefixes: Optional sequence of module prefixes that are allowed
for step class loading. If provided, step classes must come from modules
starting with one of these prefixes.
"""
super().__init__(ctx, actor_id)
self.kernel = kernel
self.factories: dict[str, Callable] = factories
self.allowed_module_prefixes: Sequence[str] | None = allowed_module_prefixes
self.parent_process_id: str | None = None
self.step_info: DaprStepInfo | None = None
self.initialize_task: bool | None = False
Expand Down Expand Up @@ -168,12 +182,6 @@ async def process_incoming_messages(self):
await self._state_manager.try_add_state(ActorStateKeys.StepIncomingMessagesState.value, messages_to_save)
await self._state_manager.save_state()

def _get_class_from_string(self, full_class_name: str):
"""Gets a class from a string."""
module_name, class_name = full_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)

async def activate_step(self):
"""Initializes the step."""
# Instantiate an instance of the inner step object and retrieve its class reference.
Expand All @@ -184,7 +192,10 @@ async def activate_step(self):
step_cls = step_object.__class__
step_instance: KernelProcessStep = step_object # type: ignore
else:
step_cls = self._get_class_from_string(self.inner_step_type)
step_cls = get_step_class_from_qualified_name(
self.inner_step_type,
allowed_module_prefixes=self.allowed_module_prefixes,
)
step_instance: KernelProcessStep = step_cls() # type: ignore

kernel_plugin = self.kernel.add_plugin(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.


from collections.abc import MutableSequence
from collections.abc import MutableSequence, Sequence
from typing import Literal

from pydantic import Field
Expand All @@ -20,17 +20,23 @@ class DaprProcessInfo(DaprStepInfo):
type: Literal["DaprProcessInfo"] = "DaprProcessInfo" # type: ignore
steps: MutableSequence["DaprStepInfo | DaprProcessInfo"] = Field(default_factory=list)

def to_kernel_process(self) -> KernelProcess:
"""Converts the Dapr process info to a kernel process."""
def to_kernel_process(self, allowed_module_prefixes: Sequence[str] | None = None) -> KernelProcess:
"""Converts the Dapr process info to a kernel process.

Args:
allowed_module_prefixes: Optional list of module prefixes that are allowed
for step class loading. If provided, step classes must come from modules
starting with one of these prefixes.
"""
if not isinstance(self.state, KernelProcessState):
raise ValueError("State must be a kernel process state")

steps: list[KernelProcessStepInfo] = []
for step in self.steps:
if isinstance(step, DaprProcessInfo):
steps.append(step.to_kernel_process())
steps.append(step.to_kernel_process(allowed_module_prefixes=allowed_module_prefixes))
else:
steps.append(step.to_kernel_process_step_info())
steps.append(step.to_kernel_process_step_info(allowed_module_prefixes=allowed_module_prefixes))

return KernelProcess(state=self.state, steps=steps, edges=self.edges)

Expand Down
31 changes: 16 additions & 15 deletions python/semantic_kernel/processes/dapr_runtime/dapr_step_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.

import importlib
from collections.abc import Sequence
from typing import Literal

from pydantic import Field
Expand All @@ -11,7 +11,7 @@
from semantic_kernel.processes.kernel_process.kernel_process_state import KernelProcessState
from semantic_kernel.processes.kernel_process.kernel_process_step_info import KernelProcessStepInfo
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState
from semantic_kernel.processes.step_utils import get_fully_qualified_name
from semantic_kernel.processes.step_utils import get_fully_qualified_name, get_step_class_from_qualified_name
from semantic_kernel.utils.feature_stage_decorator import experimental


Expand All @@ -24,13 +24,20 @@ class DaprStepInfo(KernelBaseModel):
state: KernelProcessState | KernelProcessStepState
edges: dict[str, list[KernelProcessEdge]] = Field(default_factory=dict)

def to_kernel_process_step_info(self) -> KernelProcessStepInfo:
"""Converts the Dapr step info to a kernel process step info."""
inner_step_type = self._get_class_from_string(self.inner_step_python_type)
if inner_step_type is None:
raise KernelException(
f"Unable to create inner step type from assembly qualified name `{self.inner_step_python_type}`"
)
def to_kernel_process_step_info(
self, allowed_module_prefixes: Sequence[str] | None = None
) -> KernelProcessStepInfo:
"""Converts the Dapr step info to a kernel process step info.

Args:
allowed_module_prefixes: Optional list of module prefixes that are allowed
for step class loading. If provided, step classes must come from modules
starting with one of these prefixes.
"""
inner_step_type = get_step_class_from_qualified_name(
self.inner_step_python_type,
allowed_module_prefixes=allowed_module_prefixes,
)
return KernelProcessStepInfo(inner_step_type=inner_step_type, state=self.state, output_edges=self.edges)

@classmethod
Expand All @@ -46,9 +53,3 @@ def from_kernel_step_info(cls, kernel_step_info: KernelProcessStepInfo) -> "Dapr
state=kernel_step_info.state,
edges={key: list(value) for key, value in kernel_step_info.edges.items()},
)

def _get_class_from_string(self, full_class_name: str):
"""Gets a class from a string."""
module_name, class_name = full_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)
82 changes: 82 additions & 0 deletions python/semantic_kernel/processes/step_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.

import importlib
import inspect
from collections.abc import Sequence
from typing import Any

from semantic_kernel.exceptions.process_exceptions import ProcessInvalidConfigurationException
from semantic_kernel.functions.kernel_function import KernelFunction
from semantic_kernel.processes.kernel_process.kernel_process_message_channel import KernelProcessMessageChannel
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep
from semantic_kernel.processes.kernel_process.kernel_process_step_context import KernelProcessStepContext
from semantic_kernel.utils.feature_stage_decorator import experimental

Expand Down Expand Up @@ -37,3 +42,80 @@ def find_input_channels(
def get_fully_qualified_name(cls) -> str:
"""Gets the fully qualified name of a class."""
return f"{cls.__module__}.{cls.__name__}"


@experimental
def get_step_class_from_qualified_name(
full_class_name: str,
allowed_module_prefixes: Sequence[str] | None = None,
) -> type[KernelProcessStep]:
"""Loads and validates a KernelProcessStep class from a fully qualified name.

This function validates that the loaded class is a proper subclass of
KernelProcessStep, preventing instantiation of arbitrary classes.

Args:
full_class_name: The fully qualified class name in Python import notation
(e.g., 'mypackage.mymodule.MyStep'). The module must be importable
from the current Python environment.
allowed_module_prefixes: Optional list of module prefixes that are allowed
to be imported. If provided, the module must start with one of these
prefixes. This check is performed BEFORE import to prevent execution
of module-level code in unauthorized modules. If None or empty, any
module is allowed.

Returns:
The validated class type that is a subclass of KernelProcessStep

Raises:
ProcessInvalidConfigurationException: Raised when:
- The class name format is invalid (missing module separator)
- The module is not in the allowed prefixes list (if provided)
- The module cannot be imported
- The class attribute doesn't exist in the module
- The attribute is not a class type
- The class is not a subclass of KernelProcessStep
"""
if not full_class_name or "." not in full_class_name:
raise ProcessInvalidConfigurationException(
f"Invalid step class name format: '{full_class_name}'. "
"Expected a fully qualified name like 'module.ClassName'."
)

module_name, class_name = full_class_name.rsplit(".", 1)

if not module_name or not class_name:
raise ProcessInvalidConfigurationException(
f"Invalid step class name format: '{full_class_name}'. Module name and class name cannot be empty."
)

# Check module allowlist BEFORE import to prevent module-level code execution
if allowed_module_prefixes and not any(module_name.startswith(prefix) for prefix in allowed_module_prefixes):
raise ProcessInvalidConfigurationException(
f"Module '{module_name}' is not in the allowed module prefixes: {allowed_module_prefixes}. "
f"Step class '{full_class_name}' cannot be loaded."
)

try:
module = importlib.import_module(module_name)
except ImportError as e:
raise ProcessInvalidConfigurationException(
f"Unable to import module '{module_name}' for step class '{full_class_name}': {e}"
) from e

try:
cls = getattr(module, class_name)
except AttributeError as e:
raise ProcessInvalidConfigurationException(
f"Class '{class_name}' not found in module '{module_name}': {e}"
) from e

if not inspect.isclass(cls):
raise ProcessInvalidConfigurationException(f"'{full_class_name}' is not a class type, got {type(cls).__name__}")

if not issubclass(cls, KernelProcessStep):
raise ProcessInvalidConfigurationException(
f"Step class '{full_class_name}' must be a subclass of KernelProcessStep. Got: {cls.__bases__}"
)

return cls
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
from semantic_kernel.processes.kernel_process.kernel_process_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process.kernel_process_state import KernelProcessState
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep
from semantic_kernel.processes.kernel_process.kernel_process_step_info import KernelProcessStepInfo
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState


class DummyInnerStepType:
class DummyInnerStepType(KernelProcessStep):
"""A valid KernelProcessStep subclass for testing."""

pass


Expand Down
Loading
Loading