diff --git a/server/lib/events/capturesession.go b/server/lib/events/capturesession.go new file mode 100644 index 00000000..5889cb03 --- /dev/null +++ b/server/lib/events/capturesession.go @@ -0,0 +1,61 @@ +package events + +import ( + "log/slog" + "sync" + "sync/atomic" + "time" +) + +// CaptureSession is a single-use write path that wraps events in envelopes and +// fans them out to a FileWriter (durable) and RingBuffer (in-memory). Publish +// concurrently; Close flushes the FileWriter. +type CaptureSession struct { + mu sync.Mutex + ring *RingBuffer + files *FileWriter + seq atomic.Uint64 + captureSessionID string +} + +func NewCaptureSession(captureSessionID string, ring *RingBuffer, files *FileWriter) *CaptureSession { + return &CaptureSession{ring: ring, files: files, captureSessionID: captureSessionID} +} + +// Publish wraps ev in an Envelope, truncates if needed, then writes to +// FileWriter (durable) before RingBuffer (in-memory fan-out). +func (s *CaptureSession) Publish(ev Event) { + s.mu.Lock() + defer s.mu.Unlock() + + if ev.Ts == 0 { + ev.Ts = time.Now().UnixMicro() + } + if ev.DetailLevel == "" { + ev.DetailLevel = DetailStandard + } + + env := Envelope{ + CaptureSessionID: s.captureSessionID, + Seq: s.seq.Add(1), + Event: ev, + } + env, data := truncateIfNeeded(env) + + if data == nil { + slog.Error("capture_session: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category) + } else if err := s.files.Write(env, data); err != nil { + slog.Error("capture_session: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) + } + s.ring.Publish(env) +} + +// NewReader returns a Reader positioned at the start of the ring buffer. +func (s *CaptureSession) NewReader(afterSeq uint64) *Reader { + return s.ring.NewReader(afterSeq) +} + +// Close flushes and releases all open file descriptors. +func (s *CaptureSession) Close() error { + return s.files.Close() +} diff --git a/server/lib/events/event.go b/server/lib/events/event.go new file mode 100644 index 00000000..4db821d4 --- /dev/null +++ b/server/lib/events/event.go @@ -0,0 +1,91 @@ +package events + +import ( + "encoding/json" + "log/slog" +) + +// maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB). +const maxS2RecordBytes = 1_000_000 + +// EventCategory determines type of logging +type EventCategory string + +const ( + CategoryConsole EventCategory = "console" + CategoryNetwork EventCategory = "network" + CategoryPage EventCategory = "page" + CategoryInteraction EventCategory = "interaction" + CategoryLiveview EventCategory = "liveview" + CategoryCaptcha EventCategory = "captcha" + CategorySystem EventCategory = "system" +) + +type SourceKind string + +const ( + KindCDP SourceKind = "cdp" + KindKernelAPI SourceKind = "kernel_api" + KindExtension SourceKind = "extension" + KindLocalProcess SourceKind = "local_process" +) + +// Source captures provenance: which producer emitted the event and any +// producer-specific context (e.g. CDP target/session/frame IDs). +type Source struct { + Kind SourceKind `json:"kind"` + Event string `json:"event,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +type DetailLevel string + +const ( + DetailMinimal DetailLevel = "minimal" + DetailStandard DetailLevel = "standard" + DetailVerbose DetailLevel = "verbose" + DetailRaw DetailLevel = "raw" +) + +// Event is the portable event schema. It contains only producer-emitted content; +// pipeline metadata (seq, capture session) lives on the Envelope. +type Event struct { + Ts int64 `json:"ts"` // Unix microseconds (µs since epoch) + Type string `json:"type"` + Category EventCategory `json:"category"` + Source Source `json:"source"` + DetailLevel DetailLevel `json:"detail_level"` + URL string `json:"url,omitempty"` + Data json.RawMessage `json:"data,omitempty"` + Truncated bool `json:"truncated,omitempty"` +} + +// Envelope wraps an Event with pipeline-assigned metadata. +type Envelope struct { + CaptureSessionID string `json:"capture_session_id"` + Seq uint64 `json:"seq"` + Event Event `json:"event"` +} + +// truncateIfNeeded marshals env and returns the (possibly truncated) envelope. +// If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge +// url or source.metadata), it is returned as-is — callers must handle nil data. +func truncateIfNeeded(env Envelope) (Envelope, []byte) { + data, err := json.Marshal(env) + if err != nil { + return env, nil + } + if len(data) <= maxS2RecordBytes { + return env, data + } + env.Event.Data = json.RawMessage("null") + env.Event.Truncated = true + data, err = json.Marshal(env) + if err != nil { + return env, nil + } + if len(data) > maxS2RecordBytes { + slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "seq", env.Seq, "size", len(data)) + } + return env, data +} diff --git a/server/lib/events/events_test.go b/server/lib/events/events_test.go new file mode 100644 index 00000000..9325c6ea --- /dev/null +++ b/server/lib/events/events_test.go @@ -0,0 +1,614 @@ +package events + +import ( + "bytes" + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// readEnvelope is a test helper that calls Read and asserts a non-drop result. +func readEnvelope(t *testing.T, r *Reader, ctx context.Context) Envelope { + t.Helper() + res, err := r.Read(ctx) + require.NoError(t, err) + require.NotNil(t, res.Envelope, "expected envelope, got drop") + return *res.Envelope +} + +func TestEventSerialization(t *testing.T) { + ev := Event{ + Ts: 1234567890000, + Type: "console.log", + Category: CategoryConsole, + Source: Source{ + Kind: KindCDP, + Event: "Runtime.consoleAPICalled", + Metadata: map[string]string{ + "target_id": "target-1", + "cdp_session_id": "cdp-session-1", + "frame_id": "frame-1", + "parent_frame_id": "parent-frame-1", + }, + }, + DetailLevel: DetailStandard, + URL: "https://example.com", + Data: json.RawMessage(`{"message":"hello"}`), + } + + b, err := json.Marshal(ev) + require.NoError(t, err) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(b, &decoded)) + + assert.Equal(t, "console.log", decoded["type"]) + assert.Equal(t, "console", decoded["category"]) + assert.Equal(t, "standard", decoded["detail_level"]) + assert.Equal(t, "https://example.com", decoded["url"]) + + src, ok := decoded["source"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "cdp", src["kind"]) + assert.Equal(t, "Runtime.consoleAPICalled", src["event"]) + meta, ok := src["metadata"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "target-1", meta["target_id"]) + assert.Equal(t, "cdp-session-1", meta["cdp_session_id"]) +} + +func TestEnvelopeSerialization(t *testing.T) { + env := Envelope{ + CaptureSessionID: "test-session-id", + Seq: 1, + Event: Event{ + Ts: 1000, + Type: "console.log", + Category: CategoryConsole, + Source: Source{Kind: KindCDP}, + }, + } + + b, err := json.Marshal(env) + require.NoError(t, err) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(b, &decoded)) + + assert.Equal(t, "test-session-id", decoded["capture_session_id"]) + assert.Equal(t, float64(1), decoded["seq"]) + inner, ok := decoded["event"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "console.log", inner["type"]) +} + +func TestEventData(t *testing.T) { + rawData := json.RawMessage(`{"key":"value","num":42}`) + ev := Event{ + Ts: 1000, + Type: "page.navigation", + Category: CategoryPage, + Source: Source{Kind: KindCDP}, + Data: rawData, + } + + b, err := json.Marshal(ev) + require.NoError(t, err) + + s := string(b) + assert.Contains(t, s, `"data":{"key":"value","num":42}`) + assert.NotContains(t, s, `"data":"{`) +} + +func TestEventOmitEmpty(t *testing.T) { + ev := Event{ + Ts: 1000, + Type: "console.log", + Category: CategoryConsole, + Source: Source{Kind: KindCDP}, + } + + b, err := json.Marshal(ev) + require.NoError(t, err) + + s := string(b) + assert.NotContains(t, s, `"event"`) + assert.Contains(t, s, `"detail_level"`) +} + +func mkEnv(seq uint64, ev Event) Envelope { + return Envelope{Seq: seq, Event: ev} +} + +func cdpEvent(typ string, cat EventCategory) Event { + return Event{Type: typ, Category: cat, Source: Source{Kind: KindCDP}} +} + +// TestRingBuffer: publish 3 envelopes; reader reads all 3 in order +func TestRingBuffer(t *testing.T) { + rb := NewRingBuffer(10) + reader := rb.NewReader(0) + + envelopes := []Envelope{ + mkEnv(1, cdpEvent("console.log", CategoryConsole)), + mkEnv(2, cdpEvent("network.request", CategoryNetwork)), + mkEnv(3, cdpEvent("page.navigation", CategoryPage)), + } + + for _, env := range envelopes { + rb.Publish(env) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for i, expected := range envelopes { + got := readEnvelope(t, reader, ctx) + assert.Equal(t, expected.Event.Type, got.Event.Type, "event %d", i) + assert.Equal(t, expected.Event.Category, got.Event.Category, "event %d", i) + } +} + +// TestRingBufferOverflowNoBlock: writer never blocks even with no readers +func TestRingBufferOverflowNoBlock(t *testing.T) { + rb := NewRingBuffer(2) + + done := make(chan struct{}) + go func() { + rb.Publish(mkEnv(1, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(2, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(3, cdpEvent("console.log", CategoryConsole))) + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Millisecond): + t.Fatal("Publish blocked with no readers") + } + + reader := rb.NewReader(0) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + res, err := reader.Read(ctx) + require.NoError(t, err) + assert.Nil(t, res.Envelope, "expected drop, not envelope") + assert.True(t, res.Dropped > 0) +} + +func TestRingBufferOverflowExistingReader(t *testing.T) { + rb := NewRingBuffer(2) + reader := rb.NewReader(0) + + rb.Publish(mkEnv(1, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(2, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(3, cdpEvent("console.log", CategoryConsole))) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // First read should be a drop notification + res, err := reader.Read(ctx) + require.NoError(t, err) + assert.Nil(t, res.Envelope) + assert.Equal(t, uint64(1), res.Dropped) + + // After the drop the reader continues with the surviving envelopes + second := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(2), second.Seq) + + third := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(3), third.Seq) +} + +func TestNewReaderResume(t *testing.T) { + rb := NewRingBuffer(10) + for i := uint64(1); i <= 5; i++ { + rb.Publish(mkEnv(i, cdpEvent("console.log", CategoryConsole))) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + t.Run("resume_mid_stream", func(t *testing.T) { + reader := rb.NewReader(3) + env := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(4), env.Seq) + }) + + t.Run("resume_at_latest", func(t *testing.T) { + reader := rb.NewReader(5) + // Nothing to read — should block until ctx cancels + shortCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + _, err := reader.Read(shortCtx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("resume_before_oldest_triggers_drop", func(t *testing.T) { + small := NewRingBuffer(3) + for i := uint64(1); i <= 5; i++ { + small.Publish(mkEnv(i, cdpEvent("console.log", CategoryConsole))) + } + // oldest in ring is seq 3, requesting resume after seq 1 + reader := small.NewReader(1) + res, err := reader.Read(ctx) + require.NoError(t, err) + assert.Nil(t, res.Envelope) + assert.Equal(t, uint64(1), res.Dropped) + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(3), env.Seq) + }) +} + +func TestConcurrentPublishRead(t *testing.T) { + const numEvents = 20 + rb := NewRingBuffer(32) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + reader := rb.NewReader(0) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numEvents; i++ { + _, err := reader.Read(ctx) + if !assert.NoError(t, err) { + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i <= numEvents; i++ { + rb.Publish(mkEnv(uint64(i), cdpEvent("console.log", CategoryConsole))) + } + }() + + wg.Wait() +} + +func TestConcurrentReaders(t *testing.T) { + rb := NewRingBuffer(20) + + numReaders := 3 + numEvents := 5 + + readers := make([]*Reader, numReaders) + for i := range readers { + readers[i] = rb.NewReader(0) + } + + for i := 0; i < numEvents; i++ { + rb.Publish(mkEnv(uint64(i+1), cdpEvent("console.log", CategoryConsole))) + } + + var wg sync.WaitGroup + results := make([][]Envelope, numReaders) + + for i, r := range readers { + wg.Add(1) + go func(idx int, reader *Reader) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var envs []Envelope + for j := 0; j < numEvents; j++ { + env := readEnvelope(t, reader, ctx) + envs = append(envs, env) + } + results[idx] = envs + }(i, r) + } + + wg.Wait() + + for i, envs := range results { + assert.Len(t, envs, numEvents, "reader %d", i) + for j, env := range envs { + assert.Equal(t, uint64(j+1), env.Seq, "reader %d event %d", i, j) + } + } +} + +// TestFileWriter: per-category JSONL appender tests. +func TestFileWriter(t *testing.T) { + t.Run("category_routing", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + envsToFile := []struct { + env Envelope + file string + category string + }{ + {Envelope{Seq: 1, Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}}, "console.log", "console"}, + {Envelope{Seq: 2, Event: Event{Type: "network.request", Category: CategoryNetwork, Source: Source{Kind: KindCDP}, Ts: 1}}, "network.log", "network"}, + {Envelope{Seq: 3, Event: Event{Type: "liveview.click", Category: CategoryLiveview, Source: Source{Kind: KindKernelAPI}, Ts: 1}}, "liveview.log", "liveview"}, + {Envelope{Seq: 4, Event: Event{Type: "captcha.solve", Category: CategoryCaptcha, Source: Source{Kind: KindExtension}, Ts: 1}}, "captcha.log", "captcha"}, + {Envelope{Seq: 5, Event: Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}}, "page.log", "page"}, + {Envelope{Seq: 6, Event: Event{Type: "input.click", Category: CategoryInteraction, Source: Source{Kind: KindCDP}, Ts: 1}}, "interaction.log", "interaction"}, + {Envelope{Seq: 7, Event: Event{Type: "monitor.connected", Category: CategorySystem, Source: Source{Kind: KindKernelAPI}, Ts: 1}}, "system.log", "system"}, + } + + for _, e := range envsToFile { + data, err := json.Marshal(e.env) + require.NoError(t, err) + require.NoError(t, fw.Write(e.env, data)) + } + + for _, e := range envsToFile { + data, err := os.ReadFile(filepath.Join(dir, e.file)) + require.NoError(t, err, "missing file %s for type %s", e.file, e.env.Event.Type) + + line := bytes.TrimRight(data, "\n") + require.True(t, json.Valid(line), "invalid JSON in %s", e.file) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(line, &decoded)) + inner, ok := decoded["event"].(map[string]any) + require.True(t, ok) + assert.Equal(t, e.category, inner["category"], "wrong category in %s", e.file) + srcMap, ok := inner["source"].(map[string]any) + require.True(t, ok, "source should be an object in %s", e.file) + assert.Equal(t, string(e.env.Event.Source.Kind), srcMap["kind"], "wrong source kind in %s", e.file) + } + }) + + t.Run("empty_category_rejected", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + env := Envelope{Seq: 1, Event: Event{Type: "mystery", Category: "", Source: Source{Kind: KindCDP}, Ts: 1}} + data, _ := json.Marshal(env) + err := fw.Write(env, data) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty category") + }) + + t.Run("concurrent_writes", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + const goroutines = 10 + const eventsPerGoroutine = 100 + + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < eventsPerGoroutine; j++ { + env := Envelope{ + Seq: uint64(i*eventsPerGoroutine + j), + Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}, + } + envData, err := json.Marshal(env) + require.NoError(t, err) + require.NoError(t, fw.Write(env, envData)) + } + }(i) + } + wg.Wait() + + data, err := os.ReadFile(filepath.Join(dir, "console.log")) + require.NoError(t, err) + + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + assert.Len(t, lines, goroutines*eventsPerGoroutine) + for _, line := range lines { + assert.True(t, json.Valid([]byte(line)), "invalid JSON line: %s", line) + } + }) + + t.Run("lazy_open", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + assert.Empty(t, entries, "files opened before first Write") + + env := Envelope{Seq: 1, Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}} + envData, err := json.Marshal(env) + require.NoError(t, err) + require.NoError(t, fw.Write(env, envData)) + + entries, err = os.ReadDir(dir) + require.NoError(t, err) + assert.Len(t, entries, 1, "expected exactly one file after first Write") + assert.Equal(t, "console.log", entries[0].Name()) + }) +} + +func TestCaptureSession(t *testing.T) { + newSession := func(t *testing.T) (*CaptureSession, string) { + t.Helper() + dir := t.TempDir() + rb := NewRingBuffer(100) + fw := NewFileWriter(dir) + p := NewCaptureSession("", rb, fw) + t.Cleanup(func() { p.Close() }) + return p, dir + } + + t.Run("concurrent_publish_seq_order", func(t *testing.T) { + const goroutines = 8 + const eventsEach = 50 + const total = goroutines * eventsEach + + rb := NewRingBuffer(total) + fw := NewFileWriter(t.TempDir()) + p := NewCaptureSession("", rb, fw) + t.Cleanup(func() { p.Close() }) + reader := p.NewReader(0) + + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < eventsEach; j++ { + p.Publish(cdpEvent("console.log", CategoryConsole)) + } + }() + } + wg.Wait() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for want := uint64(1); want <= total; want++ { + env := readEnvelope(t, reader, ctx) + assert.Equal(t, want, env.Seq, "events must arrive in seq order") + } + }) + + t.Run("publish_increments_seq", func(t *testing.T) { + p, _ := newSession(t) + reader := p.NewReader(0) + + for i := 0; i < 3; i++ { + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for want := uint64(1); want <= 3; want++ { + env := readEnvelope(t, reader, ctx) + assert.Equal(t, want, env.Seq, "expected seq %d got %d", want, env.Seq) + } + }) + + t.Run("publish_sets_ts", func(t *testing.T) { + p, _ := newSession(t) + reader := p.NewReader(0) + + before := time.Now().UnixMicro() + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}}) + after := time.Now().UnixMicro() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.GreaterOrEqual(t, env.Event.Ts, before) + assert.LessOrEqual(t, env.Event.Ts, after) + }) + + t.Run("publish_writes_file", func(t *testing.T) { + p, dir := newSession(t) + + p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) + + data, err := os.ReadFile(filepath.Join(dir, "console.log")) + require.NoError(t, err) + + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + require.Len(t, lines, 1) + assert.True(t, json.Valid([]byte(lines[0]))) + assert.Contains(t, lines[0], `"console.log"`) + }) + + t.Run("publish_writes_ring", func(t *testing.T) { + p, _ := newSession(t) + + reader := p.NewReader(0) + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, "page.navigation", env.Event.Type) + assert.Equal(t, CategoryPage, env.Event.Category) + }) + + t.Run("constructor_sets_capture_session_id", func(t *testing.T) { + dir := t.TempDir() + p := NewCaptureSession("test-uuid", NewRingBuffer(100), NewFileWriter(dir)) + t.Cleanup(func() { p.Close() }) + + reader := p.NewReader(0) + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, "test-uuid", env.CaptureSessionID) + }) + + t.Run("truncation_applied", func(t *testing.T) { + p, dir := newSession(t) + reader := p.NewReader(0) + + largeData := strings.Repeat("x", 1_100_000) + rawData, err := json.Marshal(map[string]string{"payload": largeData}) + require.NoError(t, err) + + p.Publish(Event{ + Type: "page.navigation", + Category: CategoryPage, + Source: Source{Kind: KindCDP}, + Ts: 1, + Data: json.RawMessage(rawData), + }) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.True(t, env.Event.Truncated) + assert.True(t, json.Valid(env.Event.Data)) + + marshaled, err := json.Marshal(env) + require.NoError(t, err) + assert.LessOrEqual(t, len(marshaled), maxS2RecordBytes) + + data, err := os.ReadFile(filepath.Join(dir, "page.log")) + require.NoError(t, err) + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + require.Len(t, lines, 1) + assert.Contains(t, lines[0], `"truncated":true`) + }) + + t.Run("defaults_detail_level", func(t *testing.T) { + p, _ := newSession(t) + reader := p.NewReader(0) + + p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, DetailStandard, env.Event.DetailLevel) + + p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1, DetailLevel: DetailVerbose}) + env2 := readEnvelope(t, reader, ctx) + assert.Equal(t, DetailVerbose, env2.Event.DetailLevel) + }) +} diff --git a/server/lib/events/filewriter.go b/server/lib/events/filewriter.go new file mode 100644 index 00000000..6ce5ff5f --- /dev/null +++ b/server/lib/events/filewriter.go @@ -0,0 +1,64 @@ +package events + +import ( + "fmt" + "os" + "path/filepath" + "sync" +) + +// FileWriter is a per-category JSONL appender. It opens each log file lazily on +// first write (O_APPEND|O_CREATE|O_WRONLY) and serialises all concurrent writes +// with a single mutex +type FileWriter struct { + mu sync.Mutex + files map[EventCategory]*os.File + dir string +} + +// NewFileWriter returns a FileWriter that writes to dir +func NewFileWriter(dir string) *FileWriter { + return &FileWriter{dir: dir, files: make(map[EventCategory]*os.File)} +} + +// Write appends data as a single JSONL line to the per-category log file. +func (fw *FileWriter) Write(env Envelope, data []byte) error { + cat := env.Event.Category + if cat == "" { + return fmt.Errorf("filewriter: event %q has empty category", env.Event.Type) + } + + fw.mu.Lock() + defer fw.mu.Unlock() + + f, ok := fw.files[cat] + if !ok { + path := filepath.Join(fw.dir, string(cat)+".log") + var err error + f, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("filewriter: open %s: %w", path, err) + } + fw.files[cat] = f + } + + if _, err := f.Write(append(data, '\n')); err != nil { + return fmt.Errorf("filewriter: write: %w", err) + } + + return nil +} + +// Close closes all open log file descriptors +func (fw *FileWriter) Close() error { + fw.mu.Lock() + defer fw.mu.Unlock() + + var firstErr error + for _, f := range fw.files { + if err := f.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go new file mode 100644 index 00000000..41659e94 --- /dev/null +++ b/server/lib/events/ringbuffer.go @@ -0,0 +1,108 @@ +package events + +import ( + "context" + "sync" +) + +// RingBuffer is a fixed-capacity circular buffer with closed-channel broadcast fan-out. +// Writers never block regardless of reader count or speed. +type RingBuffer struct { + mu sync.RWMutex + buf []Envelope + cap uint64 + latestSeq uint64 // highest envelope.Seq published + readerWake chan struct{} // closed-and-replaced on each Publish to wake blocked readers +} + +func NewRingBuffer(capacity int) *RingBuffer { + return &RingBuffer{ + buf: make([]Envelope, capacity), + cap: uint64(capacity), + readerWake: make(chan struct{}), + } +} + +// Publish adds an envelope to the ring, evicting the oldest on overflow. +func (rb *RingBuffer) Publish(env Envelope) { + rb.mu.Lock() + rb.buf[env.Seq%rb.cap] = env + rb.latestSeq = env.Seq + old := rb.readerWake + rb.readerWake = make(chan struct{}) + rb.mu.Unlock() + close(old) +} + +func (rb *RingBuffer) oldestSeq() uint64 { + if rb.latestSeq <= rb.cap { + return 1 + } + return rb.latestSeq - rb.cap + 1 +} + +// NewReader returns a Reader. afterSeq == 0 starts from the oldest available +// envelope; afterSeq > 0 resumes after that seq. +func (rb *RingBuffer) NewReader(afterSeq uint64) *Reader { + nextSeq := afterSeq + 1 + if afterSeq == 0 { + nextSeq = 1 + } + return &Reader{rb: rb, nextSeq: nextSeq} +} + +// ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is +// set: Envelope is non-nil for a normal read, Dropped is non-zero when the +// reader fell behind and events were lost. +type ReadResult struct { + Envelope *Envelope + Dropped uint64 +} + +// Reader tracks an independent read position in a RingBuffer. +type Reader struct { + rb *RingBuffer + nextSeq uint64 +} + +// Read blocks until the next envelope is available or ctx is cancelled. +func (r *Reader) Read(ctx context.Context) (ReadResult, error) { + for { + r.rb.mu.RLock() + wake := r.rb.readerWake + latest := r.rb.latestSeq + oldest := r.rb.oldestSeq() + + if latest == 0 { + r.rb.mu.RUnlock() + select { + case <-ctx.Done(): + return ReadResult{}, ctx.Err() + case <-wake: + continue + } + } + + if r.nextSeq < oldest { + dropped := oldest - r.nextSeq + r.nextSeq = oldest + r.rb.mu.RUnlock() + return ReadResult{Dropped: dropped}, nil + } + + if r.nextSeq <= latest { + env := r.rb.buf[r.nextSeq%r.rb.cap] + r.nextSeq++ + r.rb.mu.RUnlock() + return ReadResult{Envelope: &env}, nil + } + + r.rb.mu.RUnlock() + + select { + case <-ctx.Done(): + return ReadResult{}, ctx.Err() + case <-wake: + } + } +}