Skip to content

monelogics/graph

Repository files navigation

Graph: A Go Implementation of LangGraph

Graph is a Go library for building stateful, multi-agent workflows with Large Language Models (LLMs). Inspired by LangGraph, it provides a graph-based approach to orchestrating complex AI agent interactions with support for conditional logic, parallel execution, and state management.

Features

  • 🔄 Graph-Based Workflows: Build complex workflows as directed graphs with nodes and edges
  • 🧠 Multiple Node Types: Support for LLM nodes, function nodes, conditional nodes, and parallel execution
  • 🔧 State Management: Shared state system with channels for agent communication
  • ⚡ Concurrency: Thread-safe operations with configurable parallel execution
  • 🔀 Control Flow: Advanced routing with conditional edges, loops, and branching logic
  • 📊 Checkpointing: Save and restore workflow state for long-running processes
  • 🛠️ Tool Integration: Function calling support for extending agent capabilities
  • 🧪 Type Safety: Full generic support for type-safe graph operations
  • 🔄 Retry Logic: Configurable retry policies with exponential backoff
  • 📈 Monitoring: Execution tracking and error handling

Installation

go get github.com/monelogics/graph

Configuration

Before using Graph, you'll need to configure API keys for LLM providers:

  1. Copy the environment example file:

    cp .env.example .env
  2. Add your API keys to .env:

    # Required: At least one LLM provider
    GEMINI_API_KEY=your_gemini_api_key_here
    
    # Optional: Additional providers
    OPENAI_API_KEY=your_openai_api_key_here
    ANTHROPIC_API_KEY=your_anthropic_api_key_here
  3. Load configuration in your application:

    import "github.com/monelogics/graph/config"
    
    cfg, err := config.LoadFromEnv()
    if err != nil {
        log.Fatal(err)
    }

See Configuration Guide for detailed setup instructions.

Quick Start

Basic Function Node Example

package main

import (
    "context"
    "fmt"
    "log"
    
    "github.com/monelogics/graph"
)

func main() {
    // Create a new graph
    graph := mlgraph.New[string, mlgraph.Node]()
    
    // Create a simple function node
    addNode := mlgraph.NewFunctionNode("add", func(ctx context.Context, state *mlgraph.State) error {
        a, _ := state.GetInt("a")
        b, _ := state.GetInt("b")
        state.Set("result", a+b)
        return nil
    })
    
    // Add nodes to graph
    startNode := mlgraph.NewStartNode("start")
    endNode := mlgraph.NewEndNode("end")
    
    graph.AddNode("start", startNode)
    graph.AddNode("add", addNode)
    graph.AddNode("end", endNode)
    
    // Connect nodes
    graph.AddEdge("start", "add", 1.0)
    graph.AddEdge("add", "end", 1.0)
    
    // Create executor
    config := mlgraph.DefaultExecutorConfig()
    executor := mlgraph.NewExecutor(graph, config)
    
    // Set initial state
    state := mlgraph.NewState()
    state.Set("a", 5)
    state.Set("b", 3)
    executor.SetState(state)
    
    // Execute the graph
    result, err := executor.Run(context.Background(), "start")
    if err != nil {
        log.Fatal(err)
    }
    
    if result.Success {
        sum, _ := result.State.GetInt("result")
        fmt.Printf("Result: %d\n", sum) // Output: Result: 8
    }
}

Conditional Workflow Example

package main

import (
    "context"
    "fmt"
    "log"
    
    "github.com/monelogics/graph"
)

func main() {
    graph := mlgraph.New[string, mlgraph.Node]()
    
    // Create nodes
    startNode := mlgraph.NewStartNode("start")
    
    // Function to process input
    processNode := mlgraph.NewFunctionNode("process", func(ctx context.Context, state *mlgraph.State) error {
        input, _ := state.GetString("input")
        state.Set("processed", fmt.Sprintf("Processed: %s", input))
        state.Set("score", len(input)) // Use length as score
        return nil
    })
    
    // Conditional node based on score
    condition := func(state *mlgraph.State) (string, error) {
        score, _ := state.GetInt("score")
        if score > 10 {
            return "high", nil
        }
        return "low", nil
    }
    
    conditionalNode := mlgraph.NewConditionalNode("evaluate", condition)
    conditionalNode.AddBranch("high", "high_score")
    conditionalNode.AddBranch("low", "low_score")
    
    // Handler nodes
    highScoreNode := mlgraph.NewFunctionNode("high_score", func(ctx context.Context, state *mlgraph.State) error {
        state.Set("result", "High quality input!")
        return nil
    })
    
    lowScoreNode := mlgraph.NewFunctionNode("low_score", func(ctx context.Context, state *mlgraph.State) error {
        state.Set("result", "Input needs improvement")
        return nil
    })
    
    endNode := mlgraph.NewEndNode("end")
    
    // Add all nodes
    graph.AddNode("start", startNode)
    graph.AddNode("process", processNode)
    graph.AddNode("evaluate", conditionalNode)
    graph.AddNode("high_score", highScoreNode)
    graph.AddNode("low_score", lowScoreNode)
    graph.AddNode("end", endNode)
    
    // Connect nodes
    graph.AddEdge("start", "process", 1.0)
    graph.AddEdge("process", "evaluate", 1.0)
    graph.AddEdge("high_score", "end", 1.0)
    graph.AddEdge("low_score", "end", 1.0)
    
    // Execute with different inputs
    config := mlgraph.DefaultExecutorConfig()
    executor := mlgraph.NewExecutor(graph, config)
    
    // Test with short input
    state := mlgraph.NewState()
    state.Set("input", "short")
    executor.SetState(state)
    
    result, err := executor.Run(context.Background(), "start")
    if err != nil {
        log.Fatal(err)
    }
    
    if result.Success {
        res, _ := result.State.GetString("result")
        fmt.Printf("Short input result: %s\n", res)
    }
    
    // Test with long input
    state = mlgraph.NewState()
    state.Set("input", "this is a very long input string")
    executor.SetState(state)
    
    result, err = executor.Run(context.Background(), "start")
    if err != nil {
        log.Fatal(err)
    }
    
    if result.Success {
        res, _ := result.State.GetString("result")
        fmt.Printf("Long input result: %s\n", res)
    }
}

Core Concepts

Nodes

Nodes are the computational units of your graph. Graph supports several node types:

  • StartNode: Marks the beginning of execution
  • EndNode: Marks the end of execution
  • FunctionNode: Executes custom Go functions
  • LLMNode: Makes calls to Language Models (Gemini integration coming soon)
  • ConditionalNode: Makes decisions based on state
  • ParallelNode: Executes multiple nodes concurrently
  • SubgraphNode: Executes nested graphs

State Management

The state system provides shared memory for your workflow:

state := mlgraph.NewState()

// Set values
state.Set("key", "value")
state.Update(map[string]any{
    "a": 1,
    "b": 2,
})

// Get values with type safety
str, ok := state.GetString("key")
num, ok := state.GetInt("a")

// Use channels for message passing
state.SendToChannel("messages", "Hello")
messages := state.ReceiveFromChannel("messages")

// Snapshots for checkpointing
state.SaveSnapshot()
state.RestoreSnapshot(0)

Graph Construction

Build graphs programmatically:

graph := mlgraph.New[string, mlgraph.Node]()

// Add nodes
node := mlgraph.NewFunctionNode("my_func", myFunction)
graph.AddNode("my_func", node)

// Connect nodes
graph.AddEdge("node1", "node2", 1.0)

// Conditional edges
graph.AddEdgeWithData("node1", "node2", 1.0, map[string]any{
    "condition": "score > 0.8",
})

Execution

Execute graphs with configurable options:

config := mlgraph.ExecutorConfig{
    MaxConcurrency: 5,
    Timeout: 30 * time.Minute,
    RetryPolicy: mlgraph.RetryPolicy{
        MaxRetries: 3,
        RetryDelay: time.Second,
        BackoffFactor: 2.0,
    },
    ErrorHandler: func(err error, nodeID string, state *mlgraph.State) bool {
        log.Printf("Error in node %s: %v", nodeID, err)
        return true // Continue execution
    },
}

executor := mlgraph.NewExecutor(graph, config)
result, err := executor.Run(context.Background(), "start")

Advanced Features

Node Builder Pattern

Use the fluent builder API for complex nodes:

node, err := mlgraph.NewNodeBuilder(mlgraph.NodeTypeFunction, "processor").
    WithName("Data Processor").
    WithDescription("Processes incoming data").
    WithFunction(func(ctx context.Context, state *mlgraph.State) error {
        // Your processing logic
        return nil
    }).
    Build()

State Schemas

Define and validate state structures:

manager := mlgraph.NewStateManager()

schema := mlgraph.StateSchema{
    Fields: map[string]mlgraph.FieldSchema{
        "user_id": {
            Type:     "string",
            Required: true,
        },
        "score": {
            Type:    "float",
            Default: 0.0,
            Validator: func(v any) error {
                if score := v.(float64); score < 0 || score > 1 {
                    return fmt.Errorf("score must be between 0 and 1")
                }
                return nil
            },
        },
    },
}

manager.RegisterSchema("user_session", schema)
state, err := manager.CreateState("session1", "user_session")

Parallel Execution

Execute nodes in parallel with controlled concurrency:

parallelNode := mlgraph.NewParallelNode("parallel", []string{"task1", "task2", "task3"})
parallelNode.MaxConcurrency = 2
parallelNode.WaitAll = true // Wait for all to complete

graph.AddNode("parallel", parallelNode)

Error Handling and Retries

Configure sophisticated error handling:

config := mlgraph.ExecutorConfig{
    RetryPolicy: mlgraph.RetryPolicy{
        MaxRetries: 5,
        RetryDelay: 500 * time.Millisecond,
        BackoffFactor: 1.5,
        RetryCondition: func(err error) bool {
            // Only retry on specific errors
            return strings.Contains(err.Error(), "temporary")
        },
    },
    ErrorHandler: func(err error, nodeID string, state *mlgraph.State) bool {
        // Log error and decide whether to continue
        log.Printf("Node %s failed: %v", nodeID, err)
        
        // Store error in state for later analysis
        state.SendToChannel("errors", map[string]any{
            "node": nodeID,
            "error": err.Error(),
            "timestamp": time.Now(),
        })
        
        // Continue execution
        return true
    },
}

Testing

The library includes comprehensive tests. Run them with:

go test ./...

Run tests with race detection:

go test -race ./...

Performance Considerations

  • Concurrency: Use MaxConcurrency to control resource usage
  • State Size: Large state objects may impact performance; use channels for large data
  • Memory: The library maintains execution history; consider clearing it for long-running workflows
  • Checkpointing: Enable only when needed as it adds overhead

Roadmap

  • Gemini LLM integration with google.golang.org/genai
  • Advanced tool/function calling system
  • Persistent checkpointing with multiple backends
  • Graph visualization and debugging tools
  • Streaming execution mode
  • Graph composition and inheritance
  • Performance optimizations
  • More LLM provider integrations

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • Inspired by LangGraph from LangChain
  • Built with ❤️ for the Go community
  • Thanks to all contributors and users

About

monelogics graph implementation

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages