From f043be3de7ab4849f0454e34dea161656d25872a Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Wed, 29 Nov 2023 23:43:14 -0800 Subject: [PATCH 1/6] Track blobs served and expose metric endpoint --- mediorum/server/db.go | 23 ++++- mediorum/server/serve_blob.go | 60 ++++++++++++ mediorum/server/serve_metrics.go | 151 +++++++++++++++++++++++++++++++ mediorum/server/server.go | 1 + 4 files changed, 234 insertions(+), 1 deletion(-) diff --git a/mediorum/server/db.go b/mediorum/server/db.go index 81898237c25..71402f8049e 100644 --- a/mediorum/server/db.go +++ b/mediorum/server/db.go @@ -40,6 +40,27 @@ type Upload struct { // UpldateULID - this is the last ULID that change this thing } +type MetricAction string + +const ( + TrackStream MetricAction = "TRACK_STREAM" + ServeImage MetricAction = "SERVE_IMAGE" +) + +type DailyMetrics struct { + Timestamp time.Time `gorm:"primaryKey"` + Action MetricAction `gorm:"type:enum('TRACK_STREAM', 'SERVE_IMAGE');primaryKey"` + Count int64 `gorm:"not null"` + CreatedAt time.Time `json:"created_at" gorm:"not null"` +} + +type MonthlyMetrics struct { + Timestamp time.Time `gorm:"primaryKey"` + Action MetricAction `gorm:"type:enum('TRACK_STREAM', 'SERVE_IMAGE');primaryKey"` + Count int64 `gorm:"not null"` + CreatedAt time.Time `json:"created_at" gorm:"not null"` +} + type UploadCursor struct { Host string `gorm:"primaryKey"` After time.Time @@ -62,7 +83,7 @@ func dbMustDial(dbPath string) *gorm.DB { func dbMigrate(crud *crudr.Crudr, bucket *blob.Bucket, myHost string) { // Migrate the schema slog.Info("db: gorm automigrate") - err := crud.DB.AutoMigrate(&Upload{}, &RepairTracker{}, &UploadCursor{}, &StorageAndDbSize{}) + err := crud.DB.AutoMigrate(&Upload{}, &RepairTracker{}, &UploadCursor{}, &StorageAndDbSize{}, &DailyMetrics{}, &MonthlyMetrics{}) if err != nil { panic(err) } diff --git a/mediorum/server/serve_blob.go b/mediorum/server/serve_blob.go index b2243f46ce4..955f3164eee 100644 --- a/mediorum/server/serve_blob.go +++ b/mediorum/server/serve_blob.go @@ -16,6 +16,7 @@ import ( "time" "github.com/AudiusProject/audius-protocol/mediorum/server/signature" + "gorm.io/gorm" "github.com/AudiusProject/audius-protocol/mediorum/cidutil" @@ -201,6 +202,7 @@ func (ss *MediorumServer) serveBlob(c echo.Context) error { } if isAudioFile { + ss.recordMetric(TrackStream) http.ServeContent(c.Response(), c.Request(), cid, blob.ModTime(), blob) return nil } @@ -209,6 +211,7 @@ func (ss *MediorumServer) serveBlob(c echo.Context) error { if err != nil { return err } + ss.recordMetric(ServeImage) return c.Blob(200, blob.ContentType(), blobData) } @@ -230,6 +233,63 @@ func (ss *MediorumServer) serveBlob(c echo.Context) error { return c.String(404, "blob not found") } +func (ss *MediorumServer) recordMetric(action MetricAction) { + today := time.Now().UTC().Truncate(24 * time.Hour) + firstOfMonth := time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.UTC) + + // Increment daily metric + err := ss.crud.DB.Transaction(func(tx *gorm.DB) error { + var metric DailyMetrics + + if err := tx.FirstOrCreate(&metric, DailyMetrics{ + Timestamp: today, + Action: action, + }).Error; err != nil { + return err + } + + // Increment the count + metric.Count += 1 + + // Save the updated record + if err := tx.Save(&metric).Error; err != nil { + return err + } + + return nil + }) + + if err != nil { + ss.logger.Error("unable to increment daily metric", "err", err, "action", action) + } + + // Increment monthly metric + err = ss.crud.DB.Transaction(func(tx *gorm.DB) error { + var metric MonthlyMetrics + + if err := tx.FirstOrCreate(&metric, MonthlyMetrics{ + Timestamp: firstOfMonth, + Action: action, + }).Error; err != nil { + return err + } + + // Increment the count + metric.Count += 1 + + // Save the updated record + if err := tx.Save(&metric).Error; err != nil { + return err + } + + return nil + }) + + if err != nil { + ss.logger.Error("unable to increment monthly metric", "err", err, "action", action) + } +} + func (ss *MediorumServer) findNodeToServeBlob(ctx context.Context, key string) string { // use cache if possible diff --git a/mediorum/server/serve_metrics.go b/mediorum/server/serve_metrics.go index 7126f29db73..a3385296f2b 100644 --- a/mediorum/server/serve_metrics.go +++ b/mediorum/server/serve_metrics.go @@ -5,8 +5,10 @@ import ( "net/http" "os" "strings" + "time" "github.com/labstack/echo/v4" + "golang.org/x/exp/slices" ) type Metrics struct { @@ -16,6 +18,24 @@ type Metrics struct { RedirectCacheSize int `json:"redirect_cache_size"` } +type BlobMetric struct { + Timestamp time.Time `json:"timestamp" gorm:"primaryKey"` + Count int64 `json:"count"` +} + +type BlobMetrics struct { + Data []BlobMetric `json:"data"` +} + +var ( + validBucketSizes = map[string][]string{ + "week": {"day"}, + "month": {"day", "week"}, + "all_time": {"month", "week"}, + } + validBlobMetricActions = []string{"track_stream", "serve_image", "all"} +) + func (ss *MediorumServer) getMetrics(c echo.Context) error { m := Metrics{} m.Host = ss.Config.Self.Host @@ -26,6 +46,137 @@ func (ss *MediorumServer) getMetrics(c echo.Context) error { return c.JSON(200, m) } +func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { + timeRange := c.Param("timeRange") + if timeRange == "" || len(validBucketSizes[timeRange]) == 0 { + return c.String(400, fmt.Sprintf("Error: bad path param %s", timeRange)) + } + bucket := c.QueryParam("bucket_size") + if bucket == "" || !slices.Contains(validBucketSizes[timeRange], bucket) { + return c.String(400, fmt.Sprintf("Error: bad request param bucket: %s", bucket)) + } + action := c.QueryParam("action") + if action != "" && !slices.Contains(validBlobMetricActions, action) { + return c.String(400, fmt.Sprintf("Error: bad request param action: %s", action)) + } + + m := BlobMetrics{} + today := time.Now().UTC().Truncate(24 * time.Hour) + sevenDaysAgo := today.AddDate(0, 0, -7) + thirtyDaysAgo := today.AddDate(0, 0, -30) + firstOfMonth := time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.UTC) + if timeRange == "week" { + if bucket == "day" { + var metrics []BlobMetric + query := ss.crud.DB. + Model(&DailyMetrics{}). + Where("timestamp >= ? AND timestamp < ?", sevenDaysAgo, today) + if action != "" && action != "all" { + query = query.Select("timestamp, count").Where("action = ?", strings.ToUpper(action)) + } else { + // sum counts from all actions together + query = query.Select("timestamp, sum(count) as count").Group("timestamp") + } + err := query.Order("timestamp asc").Find(&metrics).Error + if err != nil { + return c.JSON(400, map[string]string{ + "error": err.Error(), + }) + } + + m.Data = metrics + } + } + + if timeRange == "month" { + if bucket == "day" { + var metrics []BlobMetric + query := ss.crud.DB. + Model(&DailyMetrics{}). + Where("timestamp >= ? AND timestamp < ?", thirtyDaysAgo, today) + if action != "" && action != "all" { + query = query.Select("timestamp, count").Where("action = ?", strings.ToUpper(action)) + } else { + // sum counts from all actions together + query = query.Select("timestamp, sum(count) as count").Group("timestamp") + } + err := query.Order("timestamp asc").Find(&metrics).Error + if err != nil { + return c.JSON(400, map[string]string{ + "error": err.Error(), + }) + } + + m.Data = metrics + } else if bucket == "week" { + var metrics []BlobMetric + groupBy := `date(timestamp, 'weekday 1', '-7 days')` + query := ss.crud.DB. + Model(&DailyMetrics{}). + Select(groupBy+` as timestamp, sum(count) as count`). + Where("timestamp >= ? AND timestamp < ?", thirtyDaysAgo, today). + Group(groupBy) + + if action != "all" { + query = query.Where("action = ?", strings.ToUpper(action)) + } + err := query.Order("timestamp asc").Find(&metrics).Error + if err != nil { + return c.JSON(400, map[string]string{ + "error": err.Error(), + }) + } + + m.Data = metrics + } + } + + if timeRange == "all_time" { + if bucket == "month" { + var metrics []BlobMetric + query := ss.crud.DB. + Model(&MonthlyMetrics{}). + Where("timestamp < ?", firstOfMonth) + if action != "" && action != "all" { + query = query.Select("timestamp, count").Where("action = ?", strings.ToUpper(action)) + } else { + // sum counts from all actions together + query = query.Select("timestamp, sum(count) as count").Group("timestamp") + } + err := query.Order("timestamp asc").Find(&metrics).Error + if err != nil { + return c.JSON(400, map[string]string{ + "error": err.Error(), + }) + } + + m.Data = metrics + } else if bucket == "week" { + var metrics []BlobMetric + groupBy := `date(timestamp, 'weekday 1', '-7 days')` + query := ss.crud.DB. + Model(&DailyMetrics{}). + Select(groupBy+` as timestamp, sum(count) as count`). + Where("timestamp < ?", today). + Group(groupBy) + + if action != "all" { + query = query.Where("action = ?", strings.ToUpper(action)) + } + err := query.Order("timestamp asc").Find(&metrics).Error + if err != nil { + return c.JSON(400, map[string]string{ + "error": err.Error(), + }) + } + + m.Data = metrics + } + } + + return c.JSON(200, m) +} + func (ss *MediorumServer) getLogfile(c echo.Context, fileName string) error { file := fmt.Sprintf("/tmp/mediorum/%s", fileName) diff --git a/mediorum/server/server.go b/mediorum/server/server.go index cd6e5bb1ad5..d19982e9762 100644 --- a/mediorum/server/server.go +++ b/mediorum/server/server.go @@ -357,6 +357,7 @@ func New(config MediorumConfig) (*MediorumServer, error) { // WIP internal: metrics internalApi.GET("/metrics", ss.getMetrics) + internalApi.GET("/metrics/blobs-served/:timeRange", ss.getBlobsServedMetrics) internalApi.GET("/logs/partition-ops", ss.getPartitionOpsLog) internalApi.GET("/logs/reaper", ss.getReaperLog) internalApi.GET("/logs/repair", ss.serveRepairLog) From acd70e883a862c0cd512377e5b7f65e46a51c364 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Thu, 30 Nov 2023 00:09:24 -0800 Subject: [PATCH 2/6] fix errs --- mediorum/server/db.go | 23 +++++++++++------------ mediorum/server/serve_blob.go | 12 +----------- mediorum/server/serve_metrics.go | 16 ++++++++-------- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/mediorum/server/db.go b/mediorum/server/db.go index 71402f8049e..f7b79a9945d 100644 --- a/mediorum/server/db.go +++ b/mediorum/server/db.go @@ -40,25 +40,24 @@ type Upload struct { // UpldateULID - this is the last ULID that change this thing } -type MetricAction string - +// Metric actions const ( - TrackStream MetricAction = "TRACK_STREAM" - ServeImage MetricAction = "SERVE_IMAGE" + TrackStream string = "track_stream" + ServeImage string = "serve_image" ) type DailyMetrics struct { - Timestamp time.Time `gorm:"primaryKey"` - Action MetricAction `gorm:"type:enum('TRACK_STREAM', 'SERVE_IMAGE');primaryKey"` - Count int64 `gorm:"not null"` - CreatedAt time.Time `json:"created_at" gorm:"not null"` + Timestamp time.Time `gorm:"primaryKey"` + Action string `gorm:"primaryKey"` + Count int64 `gorm:"not null"` + CreatedAt time.Time `json:"created_at" gorm:"not null"` } type MonthlyMetrics struct { - Timestamp time.Time `gorm:"primaryKey"` - Action MetricAction `gorm:"type:enum('TRACK_STREAM', 'SERVE_IMAGE');primaryKey"` - Count int64 `gorm:"not null"` - CreatedAt time.Time `json:"created_at" gorm:"not null"` + Timestamp time.Time `gorm:"primaryKey"` + Action string `gorm:"primaryKey"` + Count int64 `gorm:"not null"` + CreatedAt time.Time `json:"created_at" gorm:"not null"` } type UploadCursor struct { diff --git a/mediorum/server/serve_blob.go b/mediorum/server/serve_blob.go index 955f3164eee..708ff09dd36 100644 --- a/mediorum/server/serve_blob.go +++ b/mediorum/server/serve_blob.go @@ -233,25 +233,20 @@ func (ss *MediorumServer) serveBlob(c echo.Context) error { return c.String(404, "blob not found") } -func (ss *MediorumServer) recordMetric(action MetricAction) { +func (ss *MediorumServer) recordMetric(action string) { today := time.Now().UTC().Truncate(24 * time.Hour) firstOfMonth := time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.UTC) // Increment daily metric err := ss.crud.DB.Transaction(func(tx *gorm.DB) error { var metric DailyMetrics - if err := tx.FirstOrCreate(&metric, DailyMetrics{ Timestamp: today, Action: action, }).Error; err != nil { return err } - - // Increment the count metric.Count += 1 - - // Save the updated record if err := tx.Save(&metric).Error; err != nil { return err } @@ -266,18 +261,13 @@ func (ss *MediorumServer) recordMetric(action MetricAction) { // Increment monthly metric err = ss.crud.DB.Transaction(func(tx *gorm.DB) error { var metric MonthlyMetrics - if err := tx.FirstOrCreate(&metric, MonthlyMetrics{ Timestamp: firstOfMonth, Action: action, }).Error; err != nil { return err } - - // Increment the count metric.Count += 1 - - // Save the updated record if err := tx.Save(&metric).Error; err != nil { return err } diff --git a/mediorum/server/serve_metrics.go b/mediorum/server/serve_metrics.go index a3385296f2b..3f748eded9e 100644 --- a/mediorum/server/serve_metrics.go +++ b/mediorum/server/serve_metrics.go @@ -33,7 +33,7 @@ var ( "month": {"day", "week"}, "all_time": {"month", "week"}, } - validBlobMetricActions = []string{"track_stream", "serve_image", "all"} + validBlobMetricActions = []string{TrackStream, ServeImage, "all"} ) func (ss *MediorumServer) getMetrics(c echo.Context) error { @@ -72,7 +72,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Model(&DailyMetrics{}). Where("timestamp >= ? AND timestamp < ?", sevenDaysAgo, today) if action != "" && action != "all" { - query = query.Select("timestamp, count").Where("action = ?", strings.ToUpper(action)) + query = query.Select("timestamp, count").Where("action = ?", action) } else { // sum counts from all actions together query = query.Select("timestamp, sum(count) as count").Group("timestamp") @@ -95,7 +95,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Model(&DailyMetrics{}). Where("timestamp >= ? AND timestamp < ?", thirtyDaysAgo, today) if action != "" && action != "all" { - query = query.Select("timestamp, count").Where("action = ?", strings.ToUpper(action)) + query = query.Select("timestamp, count").Where("action = ?", action) } else { // sum counts from all actions together query = query.Select("timestamp, sum(count) as count").Group("timestamp") @@ -110,7 +110,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { m.Data = metrics } else if bucket == "week" { var metrics []BlobMetric - groupBy := `date(timestamp, 'weekday 1', '-7 days')` + groupBy := `date_trunc('week', timestamp)` query := ss.crud.DB. Model(&DailyMetrics{}). Select(groupBy+` as timestamp, sum(count) as count`). @@ -118,7 +118,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Group(groupBy) if action != "all" { - query = query.Where("action = ?", strings.ToUpper(action)) + query = query.Where("action = ?", action) } err := query.Order("timestamp asc").Find(&metrics).Error if err != nil { @@ -138,7 +138,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Model(&MonthlyMetrics{}). Where("timestamp < ?", firstOfMonth) if action != "" && action != "all" { - query = query.Select("timestamp, count").Where("action = ?", strings.ToUpper(action)) + query = query.Select("timestamp, count").Where("action = ?", action) } else { // sum counts from all actions together query = query.Select("timestamp, sum(count) as count").Group("timestamp") @@ -153,7 +153,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { m.Data = metrics } else if bucket == "week" { var metrics []BlobMetric - groupBy := `date(timestamp, 'weekday 1', '-7 days')` + groupBy := `date_trunc('week', timestamp)` query := ss.crud.DB. Model(&DailyMetrics{}). Select(groupBy+` as timestamp, sum(count) as count`). @@ -161,7 +161,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Group(groupBy) if action != "all" { - query = query.Where("action = ?", strings.ToUpper(action)) + query = query.Where("action = ?", action) } err := query.Order("timestamp asc").Find(&metrics).Error if err != nil { From 38f51622be7735d52b1123b7c12ec3909df7426e Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Thu, 30 Nov 2023 16:28:10 -0800 Subject: [PATCH 3/6] TODO REMOVE: push mediorum image for testing --- .circleci/src/workflows/creator.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.circleci/src/workflows/creator.yml b/.circleci/src/workflows/creator.yml index ad156294c0a..fa612732d53 100644 --- a/.circleci/src/workflows/creator.yml +++ b/.circleci/src/workflows/creator.yml @@ -4,9 +4,6 @@ jobs: name: push-mediorum context: [Vercel, dockerhub] service: mediorum - filters: - branches: - only: main - test: name: test-mediorum context: Vercel From f4f1120112841f141e89899b1ffddfa8a9b4d7a6 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Thu, 30 Nov 2023 18:29:43 -0800 Subject: [PATCH 4/6] nits --- mediorum/server/db.go | 2 +- mediorum/server/serve_blob.go | 2 +- mediorum/server/serve_metrics.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/mediorum/server/db.go b/mediorum/server/db.go index f7b79a9945d..1df9c45656d 100644 --- a/mediorum/server/db.go +++ b/mediorum/server/db.go @@ -42,7 +42,7 @@ type Upload struct { // Metric actions const ( - TrackStream string = "track_stream" + StreamTrack string = "stream_track" ServeImage string = "serve_image" ) diff --git a/mediorum/server/serve_blob.go b/mediorum/server/serve_blob.go index 708ff09dd36..93844bcdc8e 100644 --- a/mediorum/server/serve_blob.go +++ b/mediorum/server/serve_blob.go @@ -202,7 +202,7 @@ func (ss *MediorumServer) serveBlob(c echo.Context) error { } if isAudioFile { - ss.recordMetric(TrackStream) + ss.recordMetric(StreamTrack) http.ServeContent(c.Response(), c.Request(), cid, blob.ModTime(), blob) return nil } diff --git a/mediorum/server/serve_metrics.go b/mediorum/server/serve_metrics.go index 3f748eded9e..3ec2a5b13cc 100644 --- a/mediorum/server/serve_metrics.go +++ b/mediorum/server/serve_metrics.go @@ -33,7 +33,7 @@ var ( "month": {"day", "week"}, "all_time": {"month", "week"}, } - validBlobMetricActions = []string{TrackStream, ServeImage, "all"} + validBlobMetricActions = []string{StreamTrack, ServeImage, "all"} ) func (ss *MediorumServer) getMetrics(c echo.Context) error { @@ -53,11 +53,11 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { } bucket := c.QueryParam("bucket_size") if bucket == "" || !slices.Contains(validBucketSizes[timeRange], bucket) { - return c.String(400, fmt.Sprintf("Error: bad request param bucket: %s", bucket)) + return c.String(400, fmt.Sprintf("Error: bad request param bucket_size=%s", bucket)) } action := c.QueryParam("action") if action != "" && !slices.Contains(validBlobMetricActions, action) { - return c.String(400, fmt.Sprintf("Error: bad request param action: %s", action)) + return c.String(400, fmt.Sprintf("Error: bad request param action=%s", action)) } m := BlobMetrics{} From f6fe713828457583efa5054f89d38af799780eb3 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Thu, 30 Nov 2023 18:29:54 -0800 Subject: [PATCH 5/6] Revert "TODO REMOVE: push mediorum image for testing" This reverts commit 38f51622be7735d52b1123b7c12ec3909df7426e. --- .circleci/src/workflows/creator.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.circleci/src/workflows/creator.yml b/.circleci/src/workflows/creator.yml index fa612732d53..ad156294c0a 100644 --- a/.circleci/src/workflows/creator.yml +++ b/.circleci/src/workflows/creator.yml @@ -4,6 +4,9 @@ jobs: name: push-mediorum context: [Vercel, dockerhub] service: mediorum + filters: + branches: + only: main - test: name: test-mediorum context: Vercel From 646338d9f0edd1adedb2db99a579203deb3c7362 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Thu, 30 Nov 2023 18:38:37 -0800 Subject: [PATCH 6/6] fix action check --- mediorum/server/serve_metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mediorum/server/serve_metrics.go b/mediorum/server/serve_metrics.go index 3ec2a5b13cc..93352911581 100644 --- a/mediorum/server/serve_metrics.go +++ b/mediorum/server/serve_metrics.go @@ -117,7 +117,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Where("timestamp >= ? AND timestamp < ?", thirtyDaysAgo, today). Group(groupBy) - if action != "all" { + if action != "" && action != "all" { query = query.Where("action = ?", action) } err := query.Order("timestamp asc").Find(&metrics).Error @@ -160,7 +160,7 @@ func (ss *MediorumServer) getBlobsServedMetrics(c echo.Context) error { Where("timestamp < ?", today). Group(groupBy) - if action != "all" { + if action != "" && action != "all" { query = query.Where("action = ?", action) } err := query.Order("timestamp asc").Find(&metrics).Error