Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ core go vet # Vet

The package has three layers, all in the root `process` package (plus a `exec` subpackage):

### Layer 1: Process Execution (service.go, process.go, process_global.go)
### Layer 1: Process Execution (service.go, process.go)

`Service` is a Core service (`*core.ServiceRuntime[Options]`) that manages all `Process` instances. It spawns subprocesses, pipes stdout/stderr through goroutines, captures output to a `RingBuffer`, and broadcasts IPC actions (`ActionProcessStarted`, `ActionProcessOutput`, `ActionProcessExited`, `ActionProcessKilled` — defined in actions.go).

`process_global.go` provides package-level convenience functions (`Start`, `Run`, `Kill`, `List`) that delegate to a global `Service` singleton initialized via `Init(core)`. Follows the same pattern as Go's `i18n` package.
The legacy global singleton API (`process_global.go`) was removed in favor of
explicit Core service registration.

### Layer 2: Daemon Lifecycle (daemon.go, pidfile.go, health.go, registry.go)

Expand All @@ -45,19 +46,19 @@ Builder-pattern wrapper around `os/exec` with structured logging via a pluggable

## Key Patterns

- **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithName("process", NewService(...)))`.
- **Core integration**: `Service` embeds `*core.ServiceRuntime[Options]` and uses `s.Core().ACTION(...)` to broadcast typed action messages. Tests create a Core instance via `framework.New(framework.WithService(Register))`.
- **Output capture**: All process output goes through a fixed-size `RingBuffer` (default 1MB). Oldest data is silently overwritten when full. Set `RunOptions.DisableCapture` to skip buffering for long-running processes where output is only streamed via IPC.
- **Process lifecycle**: Status transitions are `StatusPending → StatusRunning → StatusExited|StatusFailed|StatusKilled`. The `done` channel closes on exit; use `<-proc.Done()` or `proc.Wait()`.
- **Detach / process group isolation**: Set `RunOptions.Detach = true` to run the subprocess in its own process group (`Setpgid`). Detached processes use `context.Background()` so they survive parent context cancellation and parent death.
- **Graceful shutdown**: `Service.OnShutdown` kills all running processes. `Daemon.Stop()` performs ordered teardown: sets health to not-ready → shuts down health server → releases PID file → unregisters from registry. `DaemonOptions.ShutdownTimeout` (default 30 s) bounds the shutdown context.
- **Auto-registration**: Pass a `Registry` and `RegistryEntry` in `DaemonOptions` to automatically register the daemon on `Start()` and unregister on `Stop()`.
- **PID liveness checks**: Both `PIDFile` and `Registry` use `proc.Signal(syscall.Signal(0))` to check if a PID is alive before trusting stored state.
- **Error handling**: All errors MUST use `coreerr.E()` from `go-log` (imported as `coreerr`), never `fmt.Errorf` or `errors.New`. Sentinel errors are package-level vars created with `coreerr.E("", "message", nil)`.
- **Error handling**: All errors MUST use `core.E()`, never `fmt.Errorf` or
`errors.New`. Sentinel errors are package-level vars created with `core.E("", "message", nil)`.

## Dependencies

- `dappco.re/go/core` — Core DI framework, IPC actions, `ServiceRuntime`
- `dappco.re/go/core/log` — Structured error constructor (`coreerr.E()`)
- `dappco.re/go/core/io` — Filesystem abstraction (`coreio.Local`) used by PIDFile and Registry
- `github.com/stretchr/testify` — test assertions (require/assert)

Expand Down
119 changes: 118 additions & 1 deletion actions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package process

import "time"
import (
"context"
"syscall"
"time"

"dappco.re/go/core"
)

// --- ACTION messages (broadcast via Core.ACTION) ---

Expand Down Expand Up @@ -35,3 +41,114 @@ type ActionProcessKilled struct {
ID string
Signal string
}

// --- Core Action Handlers ---------------------------------------------------

func (s *Service) handleRun(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
if command == "" {
return core.Result{Value: core.E("process.run", "command is required", nil), OK: false}
}

runOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
}
if r := opts.Get("args"); r.OK {
runOpts.Args = optionStrings(r.Value)
}
if r := opts.Get("env"); r.OK {
runOpts.Env = optionStrings(r.Value)
}

return s.runCommand(ctx, runOpts)
}

func (s *Service) handleStart(ctx context.Context, opts core.Options) core.Result {
command := opts.String("command")
if command == "" {
return core.Result{Value: core.E("process.start", "command is required", nil), OK: false}
}

runOpts := RunOptions{
Command: command,
Dir: opts.String("dir"),
Detach: opts.Bool("detach"),
}
if r := opts.Get("args"); r.OK {
runOpts.Args = optionStrings(r.Value)
}
if r := opts.Get("env"); r.OK {
runOpts.Env = optionStrings(r.Value)
}

r := s.StartWithOptions(ctx, runOpts)
if !r.OK {
return r
}
return core.Result{Value: r.Value.(*ManagedProcess).ID, OK: true}
}

func (s *Service) handleKill(_ context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id != "" {
if err := s.Kill(id); err != nil {
if core.Is(err, ErrProcessNotFound) {
return core.Result{Value: core.E("process.kill", core.Concat("not found: ", id), nil), OK: false}
}
return core.Result{Value: core.E("process.kill", core.Concat("kill failed: ", id), err), OK: false}
}
return core.Result{OK: true}
}

pid := opts.Int("pid")
if pid > 0 {
proc, err := processHandle(pid)
if err != nil {
return core.Result{Value: core.E("process.kill", core.Concat("find pid failed: ", core.Sprintf("%d", pid)), err), OK: false}
}
if err := proc.Signal(syscall.SIGTERM); err != nil {
return core.Result{Value: core.E("process.kill", core.Concat("signal failed: ", core.Sprintf("%d", pid)), err), OK: false}
}
return core.Result{OK: true}
}

return core.Result{Value: core.E("process.kill", "need id or pid", nil), OK: false}
}

func (s *Service) handleList(_ context.Context, _ core.Options) core.Result {
return core.Result{Value: s.managed.Names(), OK: true}
}

func (s *Service) handleGet(_ context.Context, opts core.Options) core.Result {
id := opts.String("id")
if id == "" {
return core.Result{Value: core.E("process.get", "id is required", nil), OK: false}
}
proc, err := s.Get(id)
if err != nil {
return core.Result{Value: core.E("process.get", core.Concat("not found: ", id), err), OK: false}
}
return core.Result{Value: proc.Info(), OK: true}
}

func optionStrings(value any) []string {
switch typed := value.(type) {
case nil:
return nil
case []string:
return append([]string(nil), typed...)
case []any:
result := make([]string, 0, len(typed))
for _, item := range typed {
text, ok := item.(string)
if !ok {
return nil
}
result = append(result, text)
}
return result
default:
return nil
}
}
8 changes: 8 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "sync"

// RingBuffer is a fixed-size circular buffer that overwrites old data.
// Thread-safe for concurrent reads and writes.
//
// rb := process.NewRingBuffer(1024)
type RingBuffer struct {
data []byte
size int
Expand All @@ -14,7 +16,13 @@ type RingBuffer struct {
}

// NewRingBuffer creates a ring buffer with the given capacity.
//
// rb := process.NewRingBuffer(256)
func NewRingBuffer(size int) *RingBuffer {
if size <= 0 {
size = 1
}

return &RingBuffer{
data: make([]byte, size),
size: size,
Expand Down
2 changes: 1 addition & 1 deletion buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestRingBuffer(t *testing.T) {
func TestRingBuffer_Basics_Good(t *testing.T) {
t.Run("write and read", func(t *testing.T) {
rb := NewRingBuffer(10)

Expand Down
39 changes: 27 additions & 12 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package process

import (
"context"
"errors"
"os"
"sync"
"time"

coreerr "dappco.re/go/core/log"
"dappco.re/go/core"
)

// DaemonOptions configures daemon mode execution.
//
// opts := process.DaemonOptions{PIDFile: "/tmp/process.pid", HealthAddr: "127.0.0.1:0"}
type DaemonOptions struct {
// PIDFile path for single-instance enforcement.
// Leave empty to skip PID file management.
Expand All @@ -37,6 +37,8 @@ type DaemonOptions struct {
}

// Daemon manages daemon lifecycle: PID file, health server, graceful shutdown.
//
// daemon := process.NewDaemon(process.DaemonOptions{HealthAddr: "127.0.0.1:0"})
type Daemon struct {
opts DaemonOptions
pid *PIDFile
Expand All @@ -46,6 +48,8 @@ type Daemon struct {
}

// NewDaemon creates a daemon runner with the given options.
//
// daemon := process.NewDaemon(process.DaemonOptions{PIDFile: "/tmp/process.pid"})
func NewDaemon(opts DaemonOptions) *Daemon {
if opts.ShutdownTimeout == 0 {
opts.ShutdownTimeout = 30 * time.Second
Expand Down Expand Up @@ -73,7 +77,7 @@ func (d *Daemon) Start() error {
defer d.mu.Unlock()

if d.running {
return coreerr.E("Daemon.Start", "daemon already running", nil)
return core.E("daemon.start", "daemon already running", nil)
}

if d.pid != nil {
Expand All @@ -96,12 +100,21 @@ func (d *Daemon) Start() error {
// Auto-register if registry is set
if d.opts.Registry != nil {
entry := d.opts.RegistryEntry
entry.PID = os.Getpid()
entry.PID = currentPID()
if d.health != nil {
entry.Health = d.health.Addr()
}
if err := d.opts.Registry.Register(entry); err != nil {
return coreerr.E("Daemon.Start", "registry", err)
if d.health != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), d.opts.ShutdownTimeout)
_ = d.health.Stop(shutdownCtx)
cancel()
}
if d.pid != nil {
_ = d.pid.Release()
}
d.running = false
return core.E("daemon.start", "registry", err)
}
}

Expand All @@ -113,7 +126,7 @@ func (d *Daemon) Run(ctx context.Context) error {
d.mu.Lock()
if !d.running {
d.mu.Unlock()
return coreerr.E("Daemon.Run", "daemon not started - call Start() first", nil)
return core.E("daemon.run", "daemon not started - call Start() first", nil)
}
d.mu.Unlock()

Expand All @@ -139,25 +152,27 @@ func (d *Daemon) Stop() error {
if d.health != nil {
d.health.SetReady(false)
if err := d.health.Stop(shutdownCtx); err != nil {
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
errs = append(errs, core.E("daemon.stop", "health server", err))
}
}

if d.pid != nil {
if err := d.pid.Release(); err != nil && !os.IsNotExist(err) {
errs = append(errs, coreerr.E("Daemon.Stop", "pid file", err))
if err := d.pid.Release(); err != nil && !isNotExist(err) {
errs = append(errs, core.E("daemon.stop", "pid file", err))
}
}

// Auto-unregister
if d.opts.Registry != nil {
_ = d.opts.Registry.Unregister(d.opts.RegistryEntry.Code, d.opts.RegistryEntry.Daemon)
if err := d.opts.Registry.Unregister(d.opts.RegistryEntry.Code, d.opts.RegistryEntry.Daemon); err != nil {
errs = append(errs, core.E("daemon.stop", "registry", err))
}
}

d.running = false

if len(errs) > 0 {
return errors.Join(errs...)
return core.ErrorJoin(errs...)
}
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"context"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDaemon_StartAndStop(t *testing.T) {
pidPath := filepath.Join(t.TempDir(), "test.pid")
func TestDaemon_Lifecycle_Good(t *testing.T) {
pidPath := core.JoinPath(t.TempDir(), "test.pid")

d := NewDaemon(DaemonOptions{
PIDFile: pidPath,
Expand All @@ -36,7 +36,7 @@ func TestDaemon_StartAndStop(t *testing.T) {
require.NoError(t, err)
}

func TestDaemon_DoubleStartFails(t *testing.T) {
func TestDaemon_AlreadyRunning_Bad(t *testing.T) {
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
})
Expand All @@ -50,7 +50,7 @@ func TestDaemon_DoubleStartFails(t *testing.T) {
assert.Contains(t, err.Error(), "already running")
}

func TestDaemon_RunWithoutStartFails(t *testing.T) {
func TestDaemon_RunUnstarted_Bad(t *testing.T) {
d := NewDaemon(DaemonOptions{})

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -61,7 +61,7 @@ func TestDaemon_RunWithoutStartFails(t *testing.T) {
assert.Contains(t, err.Error(), "not started")
}

func TestDaemon_SetReady(t *testing.T) {
func TestDaemon_SetReady_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
})
Expand All @@ -83,17 +83,17 @@ func TestDaemon_SetReady(t *testing.T) {
_ = resp.Body.Close()
}

func TestDaemon_NoHealthAddrReturnsEmpty(t *testing.T) {
func TestDaemon_HealthAddrDisabled_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{})
assert.Empty(t, d.HealthAddr())
}

func TestDaemon_DefaultShutdownTimeout(t *testing.T) {
func TestDaemon_DefaultTimeout_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{})
assert.Equal(t, 30*time.Second, d.opts.ShutdownTimeout)
}

func TestDaemon_RunBlocksUntilCancelled(t *testing.T) {
func TestDaemon_RunBlocking_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
})
Expand Down Expand Up @@ -126,17 +126,17 @@ func TestDaemon_RunBlocksUntilCancelled(t *testing.T) {
}
}

func TestDaemon_StopIdempotent(t *testing.T) {
func TestDaemon_StopIdempotent_Good(t *testing.T) {
d := NewDaemon(DaemonOptions{})

// Stop without Start should be a no-op
err := d.Stop()
assert.NoError(t, err)
}

func TestDaemon_AutoRegisters(t *testing.T) {
func TestDaemon_AutoRegister_Good(t *testing.T) {
dir := t.TempDir()
reg := NewRegistry(filepath.Join(dir, "daemons"))
reg := NewRegistry(core.JoinPath(dir, "daemons"))

d := NewDaemon(DaemonOptions{
HealthAddr: "127.0.0.1:0",
Expand Down
Loading