Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -332,7 +333,10 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {

tx := f.worker.GetBestFittingTx(f.batch.remainingResources)
metrics.WorkerProcessingTime(time.Since(start))
metrics.GetLogStatistics().CumulativeTiming(metrics.GetTx, time.Since(start))

if tx != nil {
metrics.GetLogStatistics().CumulativeCounting(metrics.TxCounter)
log.Debugf("processing tx: %s", tx.Hash.Hex())

// reset the count of effective GasPrice process attempts (since the tx may have been tried to be processed before)
Expand All @@ -344,20 +348,25 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
if err != nil {
if err == ErrEffectiveGasPriceReprocess {
log.Info("reprocessing tx because of effective gas price calculation: %s", tx.Hash.Hex())
metrics.GetLogStatistics().CumulativeCounting(metrics.ReprocessingTxCounter)
continue
} else {
log.Errorf("failed to process transaction in finalizeBatches, Err: %v", err)
metrics.GetLogStatistics().CumulativeCounting(metrics.FailTxCounter)
break
}
}
metrics.GetLogStatistics().CumulativeValue(metrics.BatchGas, int64(tx.Gas))
break
}

f.sharedResourcesMux.Unlock()
} else {
// wait for new txs
log.Debugf("no transactions to be processed. Sleeping for %v", f.cfg.SleepDuration.Duration)
if f.cfg.SleepDuration.Duration > 0 {
time.Sleep(f.cfg.SleepDuration.Duration)
metrics.GetLogStatistics().CumulativeCounting(metrics.GetTxPauseCounter)
}
}

Expand All @@ -369,10 +378,18 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {

if f.isDeadlineEncountered() {
log.Infof("closing batch %d because deadline was encountered.", f.batch.batchNumber)
metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, "deadline")
f.finalizeBatch(ctx)
log.Infof(metrics.GetLogStatistics().Summary())
metrics.GetLogStatistics().ResetStatistics()
metrics.GetLogStatistics().UpdateTimestamp(metrics.NewRound, time.Now())
} else if f.isBatchFull() || f.isBatchAlmostFull() {
log.Infof("closing batch %d because it's almost full.", f.batch.batchNumber)
metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, "full")
f.finalizeBatch(ctx)
log.Infof(metrics.GetLogStatistics().Summary())
metrics.GetLogStatistics().ResetStatistics()
metrics.GetLogStatistics().UpdateTimestamp(metrics.NewRound, time.Now())
}

if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -412,8 +429,10 @@ func (f *finalizer) isBatchFull() bool {
// finalizeBatch retries to until successful closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch
func (f *finalizer) finalizeBatch(ctx context.Context) {
start := time.Now()
metrics.GetLogStatistics().SetTag(metrics.FinalizeBatchNumber, strconv.Itoa(int(f.batch.batchNumber)))
defer func() {
metrics.ProcessingTime(time.Since(start))
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchTiming, time.Since(start))
}()

var err error
Expand Down Expand Up @@ -495,6 +514,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
}

// Reprocess full batch as sanity check
tsReprocessFullBatch := time.Now()
if f.cfg.SequentialReprocessFullBatch {
// Do the full batch reprocess now
_, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.initialStateRoot, f.batch.stateRoot)
Expand All @@ -508,14 +528,18 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
_, _ = f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.initialStateRoot, f.batch.stateRoot)
}()
}
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchReprocessFullBatch, time.Since(tsReprocessFullBatch))

// Close the current batch
tsCloseBatch := time.Now()
err = f.closeBatch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to close batch, err: %w", err)
}
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchCloseBatch, time.Since(tsCloseBatch))

// Metadata for the next batch
tsOpenBatch := time.Now()
stateRoot := f.batch.stateRoot
lastBatchNumber := f.batch.batchNumber

Expand Down Expand Up @@ -544,6 +568,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
f.processRequest.GlobalExitRoot = batch.globalExitRoot
f.processRequest.Transactions = make([]byte, 0, 1)
}
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchOpenBatch, time.Since(tsOpenBatch))

return batch, err
}
Expand All @@ -558,6 +583,9 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
start := time.Now()
defer func() {
metrics.ProcessingTime(time.Since(start))
if tx != nil {
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxTiming, time.Since(start))
}
}()

if f.batch.isEmpty() {
Expand Down Expand Up @@ -624,6 +652,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
}

log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hashStr, f.processRequest.GlobalExitRoot.String())
tsCommit := time.Now()
processBatchResponse, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
if err != nil && errors.Is(err, runtime.ErrExecutorDBError) {
log.Errorf("failed to process transaction: %s", err)
Expand All @@ -643,10 +672,15 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
log.Errorf("failed to update status to invalid in the pool for tx: %s, err: %s", tx.Hash.String(), err)
} else {
metrics.TxProcessed(metrics.TxProcessedLabelInvalid, 1)
metrics.GetLogStatistics().CumulativeCounting(metrics.ProcessingInvalidTxCounter)
}
return nil, err
}
if tx != nil {
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxCommit, time.Since(tsCommit))
}

tsProcessResponse := time.Now()
oldStateRoot := f.batch.stateRoot
if len(processBatchResponse.Responses) > 0 && tx != nil {
errWg, err = f.handleProcessTransactionResponse(ctx, tx, processBatchResponse, oldStateRoot)
Expand All @@ -660,6 +694,10 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
f.batch.localExitRoot = processBatchResponse.NewLocalExitRoot
log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, processBatchResponse.NewStateRoot.String(), processBatchResponse.NewLocalExitRoot.String(), oldStateRoot.String())

if tx != nil {
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxResponse, time.Since(tsProcessResponse))
}

return nil, nil
}

Expand Down
40 changes: 40 additions & 0 deletions sequencer/metrics/logstatistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package metrics

import (
"time"
)

type LogTag string

type LogStatistics interface {
CumulativeCounting(tag LogTag)
CumulativeValue(tag LogTag, value int64)
CumulativeTiming(tag LogTag, duration time.Duration)
SetTag(tag LogTag, value string)
Summary() string
ResetStatistics()

UpdateTimestamp(tag LogTag, tm time.Time)
}

const (
TxCounter LogTag = "TxCounter"
GetTx LogTag = "GetTx"
GetTxPauseCounter LogTag = "GetTxPauseCounter"
BatchCloseReason LogTag = "BatchCloseReason"
ReprocessingTxCounter LogTag = "ReProcessingTxCounter"
FailTxCounter LogTag = "FailTxCounter"
NewRound LogTag = "NewRound"
BatchGas LogTag = "BatchGas"

ProcessingTxTiming LogTag = "ProcessingTxTiming"
ProcessingInvalidTxCounter LogTag = "ProcessingInvalidTxCounter"
ProcessingTxCommit LogTag = "ProcessingTxCommit"
ProcessingTxResponse LogTag = "ProcessingTxResponse"

FinalizeBatchTiming LogTag = "FinalizeBatchTiming"
FinalizeBatchNumber LogTag = "FinalizeBatchNumber"
FinalizeBatchReprocessFullBatch LogTag = "FinalizeBatchReprocessFullBatch"
FinalizeBatchCloseBatch LogTag = "FinalizeBatchCloseBatch"
FinalizeBatchOpenBatch LogTag = "FinalizeBatchOpenBatch"
)
85 changes: 85 additions & 0 deletions sequencer/metrics/logstatisticsimpl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package metrics

import (
"strconv"
"sync"
"time"
)

var instance *logStatisticsInstance
var once sync.Once

func GetLogStatistics() LogStatistics {
once.Do(func() {
instance = &logStatisticsInstance{}
instance.init()
})
return instance
}

type logStatisticsInstance struct {
timestamp map[LogTag]time.Time
statistics map[LogTag]int64 // value maybe the counter or time.Duration(ms)
tags map[LogTag]string
}

func (l *logStatisticsInstance) init() {
l.timestamp = make(map[LogTag]time.Time)
l.statistics = make(map[LogTag]int64)
l.tags = make(map[LogTag]string)
}

func (l *logStatisticsInstance) CumulativeCounting(tag LogTag) {
l.statistics[tag]++
}

func (l *logStatisticsInstance) CumulativeValue(tag LogTag, value int64) {
l.statistics[tag] += value
}

func (l *logStatisticsInstance) CumulativeTiming(tag LogTag, duration time.Duration) {
l.statistics[tag] += duration.Milliseconds()
}

func (l *logStatisticsInstance) SetTag(tag LogTag, value string) {
l.tags[tag] = value
}

func (l *logStatisticsInstance) UpdateTimestamp(tag LogTag, tm time.Time) {
l.timestamp[tag] = tm
}

func (l *logStatisticsInstance) ResetStatistics() {
l.statistics = make(map[LogTag]int64)
l.tags = make(map[LogTag]string)
}

func (l *logStatisticsInstance) Summary() string {
batchTotalDuration := "-"
if key, ok := l.timestamp[NewRound]; ok {
batchTotalDuration = strconv.Itoa(int(time.Since(key).Milliseconds()))
}
processTxTiming := "ProcessTx<" + strconv.Itoa(int(l.statistics[ProcessingTxTiming])) + "ms, " +
"Commit<" + strconv.Itoa(int(l.statistics[ProcessingTxCommit])) + "ms>, " +
"ProcessResponse<" + strconv.Itoa(int(l.statistics[ProcessingTxResponse])) + "ms>>, "

finalizeBatchTiming := "FinalizeBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchTiming])) + "ms, " +
"ReprocessFullBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchReprocessFullBatch])) + "ms>, " +
"CloseBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchCloseBatch])) + "ms>, " +
"OpenBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchOpenBatch])) + "ms>>, "

result := "Batch<" + l.tags[FinalizeBatchNumber] + ">, " +
"TotalDuration<" + batchTotalDuration + "ms>, " +
"GasUsed<" + strconv.Itoa(int(l.statistics[BatchGas])) + ">, " +
"Tx<" + strconv.Itoa(int(l.statistics[TxCounter])) + ">, " +
"GetTx<" + strconv.Itoa(int(l.statistics[GetTx])) + "ms>, " +
"GetTxPause<" + strconv.Itoa(int(l.statistics[GetTxPauseCounter])) + ">, " +
"ReprocessTx<" + strconv.Itoa(int(l.statistics[ReprocessingTxCounter])) + ">, " +
"FailTx<" + strconv.Itoa(int(l.statistics[FailTxCounter])) + ">, " +
"InvalidTx<" + strconv.Itoa(int(l.statistics[ProcessingInvalidTxCounter])) + ">, " +
processTxTiming +
finalizeBatchTiming +
"BatchCloseReason<" + l.tags[BatchCloseReason] + ">"

return result
}
51 changes: 51 additions & 0 deletions sequencer/metrics/logstatisticsimpl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package metrics

import (
"testing"
"time"
)

func Test_logStatisticsInstance_Summary(t *testing.T) {
type fields struct {
timestamp map[LogTag]time.Time
statistics map[LogTag]int64
tags map[LogTag]string
}
tests := []struct {
name string
fields fields
want string
}{
// TODO: Add test cases.
{"1", fields{
timestamp: map[LogTag]time.Time{NewRound: time.Now().Add(-time.Second)},
statistics: map[LogTag]int64{
BatchGas: 111111,
TxCounter: 10,
GetTx: time.Second.Milliseconds(),
GetTxPauseCounter: 2,
ReprocessingTxCounter: 3,
FailTxCounter: 1,
ProcessingInvalidTxCounter: 2,
ProcessingTxTiming: time.Second.Milliseconds() * 30,
ProcessingTxCommit: time.Second.Milliseconds() * 10,
ProcessingTxResponse: time.Second.Milliseconds() * 15,
FinalizeBatchTiming: time.Second.Milliseconds() * 50,
FinalizeBatchReprocessFullBatch: time.Second.Milliseconds() * 20,
FinalizeBatchCloseBatch: time.Second.Milliseconds() * 10,
FinalizeBatchOpenBatch: time.Second.Milliseconds() * 10,
},
tags: map[LogTag]string{BatchCloseReason: "deadline", FinalizeBatchNumber: "123"},
}, "test"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &logStatisticsInstance{
timestamp: tt.fields.timestamp,
statistics: tt.fields.statistics,
tags: tt.fields.tags,
}
t.Log(l.Summary())
})
}
}