Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion application/single_app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
EXECUTOR_TYPE = 'thread'
EXECUTOR_MAX_WORKERS = 30
SESSION_TYPE = 'filesystem'
VERSION = "0.239.013"
VERSION = "0.239.014"

SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')

Expand Down
119 changes: 119 additions & 0 deletions application/single_app/semantic_kernel_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,104 @@ def normalize(s):
print(f"[SK Loader] Error loading agent-specific plugins: {e}")
log_event(f"[SK Loader] Error loading agent-specific plugins: {e}", level=logging.ERROR, exceptionTraceback=True)


def _extract_sql_schema_for_instructions(kernel) -> str:
"""
Check if any SQL Schema plugins are loaded in the kernel and extract their schema
information to inject into agent instructions.

Returns a formatted schema summary string, or empty string if no SQL schema plugins found.
"""
from semantic_kernel_plugins.sql_schema_plugin import SQLSchemaPlugin

schema_parts = []

try:
# Iterate through all registered plugins in the kernel
for plugin_name, plugin in kernel.plugins.items():
# Check if the underlying plugin object is a SQLSchemaPlugin
# Kernel plugins wrap the original object - we need to check the underlying instance
plugin_obj = None

# Try to access the underlying plugin instance
if isinstance(plugin, SQLSchemaPlugin):
plugin_obj = plugin
elif hasattr(plugin, '_plugin_instance'):
if isinstance(plugin._plugin_instance, SQLSchemaPlugin):
plugin_obj = plugin._plugin_instance
else:
# Check if any function in this plugin belongs to a SQLSchemaPlugin
for func_name, func in plugin.functions.items():
if hasattr(func, 'method') and hasattr(func.method, '__self__'):
if isinstance(func.method.__self__, SQLSchemaPlugin):
plugin_obj = func.method.__self__
break

if plugin_obj is not None:
print(f"[SK Loader] Found SQL Schema plugin: {plugin_name}, fetching schema...")
try:
schema_result = plugin_obj.get_database_schema()
if schema_result and hasattr(schema_result, 'data'):
schema_data = schema_result.data
else:
schema_data = schema_result

if isinstance(schema_data, dict) and "tables" in schema_data:
db_name = schema_data.get("database_name", "Unknown")
db_type = schema_data.get("database_type", "Unknown")

schema_text = f"### Database: {db_name} ({db_type})\n\n"

for table_name, table_info in schema_data["tables"].items():
schema_name = table_info.get("schema_name", "dbo")
qualified_name = f"{schema_name}.{table_name}" if schema_name else table_name
schema_text += f"**Table: {qualified_name}**\n"

columns = table_info.get("columns", [])
if columns:
schema_text += "| Column | Type | Nullable |\n|--------|------|----------|\n"
for col in columns:
col_name = col.get("column_name", "?")
col_type = col.get("data_type", "?")
nullable = "Yes" if col.get("is_nullable", True) else "No"
schema_text += f"| {col_name} | {col_type} | {nullable} |\n"

pks = table_info.get("primary_keys", [])
if pks:
schema_text += f"Primary Key(s): {', '.join(pks)}\n"

schema_text += "\n"

# Add relationships
relationships = schema_data.get("relationships", [])
if relationships:
schema_text += "**Relationships (Foreign Keys):**\n"
for rel in relationships:
parent = rel.get("parent_table", "?")
parent_col = rel.get("parent_column", "?")
ref = rel.get("referenced_table", "?")
ref_col = rel.get("referenced_column", "?")
schema_text += f"- {parent}.{parent_col} → {ref}.{ref_col}\n"
schema_text += "\n"

schema_parts.append(schema_text)
print(f"[SK Loader] Successfully extracted schema for {db_name}: {len(schema_data['tables'])} tables")
else:
print(f"[SK Loader] Schema data for {plugin_name} was empty or had unexpected format")

except Exception as e:
print(f"[SK Loader] Warning: Failed to fetch schema from {plugin_name}: {e}")
log_event(f"[SK Loader] Failed to fetch SQL schema for injection: {e}",
extra={"plugin_name": plugin_name, "error": str(e)},
level=logging.WARNING)
except Exception as e:
print(f"[SK Loader] Warning: Error iterating kernel plugins for SQL schema: {e}")
log_event(f"[SK Loader] Error iterating kernel plugins for SQL schema: {e}",
extra={"error": str(e)}, level=logging.WARNING)

return "\n".join(schema_parts)


def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis_client=None, mode_label="global"):
"""
DRY helper to load a single agent (default agent) for the kernel.
Expand Down Expand Up @@ -871,6 +969,27 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis
group_id=group_id,
)

# Auto-inject SQL database schema into agent instructions if SQL plugins are loaded
try:
sql_schema_summary = _extract_sql_schema_for_instructions(kernel)
if sql_schema_summary:
agent_config["instructions"] = (
agent_config.get("instructions", "") +
"\n\n## Available Database Schema\n"
"The following database tables and columns are available for SQL queries. "
"ALWAYS use these exact table and column names when writing SQL queries.\n\n" +
sql_schema_summary +
"\n\nWhen a user asks a question about data, use the schema above to construct "
"the appropriate SQL query and execute it using the SQL Query plugin functions. "
"Do NOT ask the user for table or column names — use the schema provided above."
)
print(f"[SK Loader] Injected SQL schema into agent instructions for {agent_config['name']}")
except Exception as e:
print(f"[SK Loader] Warning: Failed to inject SQL schema into instructions: {e}")
log_event(f"[SK Loader] Failed to inject SQL schema into agent instructions: {e}",
extra={"agent_name": agent_config["name"], "error": str(e)},
level=logging.WARNING)

try:
kwargs = {
"name": agent_config["name"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ def _create_plugin_instance(self, manifest: Dict[str, Any]):
return self._create_openapi_plugin(manifest)
elif plugin_type == 'python':
return self._create_python_plugin(manifest)
#elif plugin_type in ['sql_schema', 'sql_query']:
# return self._create_sql_plugin(manifest)
elif plugin_type in ['sql_schema', 'sql_query']:
return self._create_sql_plugin(manifest)
else:
try:
debug_print(f"[Logged Plugin Loader] Attempting to discover plugin type: {plugin_type}")
Expand Down
103 changes: 94 additions & 9 deletions application/single_app/semantic_kernel_plugins/sql_query_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ def metadata(self) -> Dict[str, Any]:
user_desc = self._metadata.get("description", f"SQL Query plugin for {self.database_type} database")
api_desc = (
"This plugin executes SQL queries against databases and returns structured results. "
"It supports SQL Server, PostgreSQL, MySQL, and SQLite databases. The plugin includes "
"query sanitization, validation, and security features including parameterized queries, "
"read-only mode, result limiting, and timeout protection. It automatically cleans queries "
"from unnecessary characters and formats results for easy consumption by AI agents. "
"The plugin handles database-specific SQL variations and connection management."
"It supports SQL Server, PostgreSQL, MySQL, and SQLite databases. "
"WORKFLOW: Before executing any query, you MUST first use the SQL Schema plugin to discover "
"available tables, column names, data types, and relationships. Then construct valid SQL queries "
"using the discovered schema with correct fully-qualified table names (e.g., dbo.TableName). "
"The plugin includes query sanitization, validation, and security features including "
"parameterized queries, read-only mode, result limiting, and timeout protection."
)
full_desc = f"{user_desc}\n\n{api_desc}"

Expand Down Expand Up @@ -215,14 +216,24 @@ def metadata(self) -> Dict[str, Any]:
{"name": "query", "type": "str", "description": "The SQL query to validate", "required": True}
],
"returns": {"type": "ResultWithMetadata", "description": "Validation result with any issues found"}
},
{
"name": "query_database",
"description": "Execute a SQL query to answer a question about the database",
"parameters": [
{"name": "question", "type": "str", "description": "The natural language question being answered", "required": True},
{"name": "query", "type": "str", "description": "The SQL query to execute", "required": True},
{"name": "max_rows", "type": "int", "description": "Maximum number of rows to return (overrides default)", "required": False}
],
"returns": {"type": "ResultWithMetadata", "description": "Query results with columns, data, and original question context"}
}
]
}

def get_functions(self) -> List[str]:
return ["execute_query", "execute_scalar", "validate_query"]
return ["execute_query", "execute_scalar", "validate_query", "query_database"]

@kernel_function(description="Execute a SQL query and return results")
@kernel_function(description="Execute a SQL query against the database and return results as structured data with columns and rows. IMPORTANT: Before calling this function, you MUST first call get_database_schema or get_table_list from the SQL Schema plugin to discover available tables, column names, data types, and relationships. Use the discovered schema to construct valid SQL queries with correct table and column names. Always use fully qualified table names (e.g., dbo.TableName) when available. Results are limited by max_rows to prevent excessive data transfer.")
@plugin_function_logger("SQLQueryPlugin")
def execute_query(
self,
Expand Down Expand Up @@ -301,7 +312,7 @@ def execute_query(
}
return ResultWithMetadata(error_result, self.metadata)

@kernel_function(description="Execute a query that returns a single value")
@kernel_function(description="Execute a query that returns a single scalar value (e.g., COUNT, SUM, MAX, MIN). You MUST first discover the database schema using get_database_schema or get_table_list before calling this function to ensure correct table and column references.")
@plugin_function_logger("SQLQueryPlugin")
def execute_scalar(
self,
Expand Down Expand Up @@ -360,7 +371,7 @@ def execute_scalar(
}
return ResultWithMetadata(error_result, self.metadata)

@kernel_function(description="Validate a SQL query without executing it")
@kernel_function(description="Validate a SQL query for syntax correctness and safety without executing it. Use this to pre-check complex queries before execution, especially when constructing multi-table JOINs or complex WHERE clauses.")
@plugin_function_logger("SQLQueryPlugin")
def validate_query(self, query: str) -> ResultWithMetadata:
"""Validate a SQL query without executing it"""
Expand All @@ -380,6 +391,80 @@ def validate_query(self, query: str) -> ResultWithMetadata:
}
return ResultWithMetadata(error_result, self.metadata)

@kernel_function(description="Execute a SQL query to answer a question about the database. This is a convenience function that executes a SQL query and returns results along with the original question for context. IMPORTANT: Before calling this function, you MUST first call get_database_schema or get_table_list from the SQL Schema plugin to discover available tables, column names, data types, and relationships. Then construct the appropriate SQL query using the discovered schema and provide it along with the original question.")
@plugin_function_logger("SQLQueryPlugin")
def query_database(
self,
question: str,
query: str,
max_rows: Optional[int] = None
) -> ResultWithMetadata:
"""Execute a SQL query to answer a specific question about the database"""
try:
# Clean and validate the query
cleaned_query = self._clean_query(query)
validation_result = self._validate_query(cleaned_query)

if not validation_result["is_valid"]:
raise ValueError(f"Invalid query: {validation_result['issues']}")

conn = self._get_connection()
cursor = conn.cursor()

# Set query timeout
if hasattr(cursor, 'settimeout'):
cursor.settimeout(self.timeout)

cursor.execute(cleaned_query)

# Get column names
if hasattr(cursor, 'description') and cursor.description:
columns = [desc[0] for desc in cursor.description]
else:
columns = []

# Fetch results with row limit
effective_max_rows = max_rows or self.max_rows

if self.database_type == 'sqlite':
rows = cursor.fetchall()
if len(rows) > effective_max_rows:
rows = rows[:effective_max_rows]
results = [dict(row) for row in rows]
else:
rows = cursor.fetchmany(effective_max_rows)
results = []
for row in rows:
if isinstance(row, (list, tuple)):
results.append(dict(zip(columns, row)))
else:
results.append(row)

# Prepare result data with question context
result_data = {
"question": question,
"columns": columns,
"data": results,
"row_count": len(results),
"is_truncated": len(results) >= effective_max_rows,
"query": cleaned_query
}

log_event(f"[SQLQueryPlugin] query_database executed successfully, returned {len(results)} rows", extra={"question": question})
return ResultWithMetadata(result_data, self.metadata)

except Exception as e:
log_event(f"[SQLQueryPlugin] Error in query_database: {e}", extra={"question": question})
error_result = {
"error": str(e),
"question": question,
"query": query,
"columns": [],
"data": [],
"row_count": 0
}
return ResultWithMetadata(error_result, self.metadata)

def _clean_query(self, query: str) -> str:
"""Clean query from unnecessary characters and formatting"""
if not query:
Expand Down
Loading