Skip to content

Commit d4980dd

Browse files
committed
feat(transcoder): enhance video metadata handling and notifier integration
1 parent ad14012 commit d4980dd

9 files changed

Lines changed: 127 additions & 26 deletions

File tree

backend/server/media_handler.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ type (
4646
ContentType string `json:"content_type" validate:"required"`
4747
}
4848
AssetsResponseData struct {
49-
UploadUrl string `json:"upload_url"`
50-
Header map[string]string `json:"header"`
51-
Asset Asset `json:"asset"`
52-
Form map[string]string `json:"form"`
49+
UploadUrl string `json:"upload_url"`
50+
Header *map[string]string `json:"header,omitempty"`
51+
Asset *Asset `json:"asset,omitempty"`
52+
Form *map[string]string `json:"form,omitempty"`
5353
}
5454
AssetsResponse struct {
5555
Data *AssetsResponseData `json:"data,omitempty"`
@@ -117,13 +117,13 @@ func (s *Server) VideoAssetsHandler(c echo.Context) error {
117117
s.log.Error(ErrFailedToCreateVideoRecord, "err", err)
118118
return c.JSON(http.StatusInternalServerError, AssetsResponse{Error: ErrFailedToCreateVideoRecord})
119119
}
120-
presignedUrl, err := s.storage.PresignedClient().PresignPostObject(c.Request().Context(), &s3.PutObjectInput{
120+
presignedUrl, err := s.storage.PresignedClient().PresignPutObject(c.Request().Context(), &s3.PutObjectInput{
121121
Bucket: aws.String(s.cfg.S3.RawMediaBucket),
122122
Key: aws.String(key),
123123
ContentType: aws.String(body.ContentType),
124124
ContentLength: aws.Int64(int64(body.Size)),
125125
Metadata: map[string]string{},
126-
}, func(options *s3.PresignPostOptions) {
126+
}, func(options *s3.PresignOptions) {
127127
options.Expires = time.Duration(POST_PRESIGNED_URL_TTL) * time.Second
128128
})
129129
if err != nil {
@@ -133,16 +133,16 @@ func (s *Server) VideoAssetsHandler(c echo.Context) error {
133133
return c.JSON(http.StatusOK, AssetsResponse{
134134
Data: &AssetsResponseData{
135135
UploadUrl: presignedUrl.URL,
136-
Header: map[string]string{},
137-
Asset: Asset{
136+
Header: &map[string]string{},
137+
Asset: &Asset{
138138
Id: videoId,
139139
Name: body.Name,
140140
Size: int(body.Size),
141141
ContentType: body.ContentType,
142142
Href: "",
143143
OriginalName: body.Name,
144144
},
145-
Form: presignedUrl.Values,
145+
Form: nil,
146146
},
147147
Message: MsgPresignedURLGenerated,
148148
})

backend/swagger/docs.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,15 +567,19 @@ const docTemplate = `{
567567
"PREUPLOAD",
568568
"UPLOADED",
569569
"PROCESSING",
570+
"FAILED",
570571
"READY",
571-
"FAILED"
572+
"PRIVATE",
573+
"PUBLIC"
572574
],
573575
"x-enum-varnames": [
574576
"VideoStatusPREUPLOAD",
575577
"VideoStatusUPLOADED",
576578
"VideoStatusPROCESSING",
579+
"VideoStatusFAILED",
577580
"VideoStatusREADY",
578-
"VideoStatusFAILED"
581+
"VideoStatusPRIVATE",
582+
"VideoStatusPUBLIC"
579583
]
580584
},
581585
"pgtype.InfinityModifier": {

backend/swagger/swagger.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,15 +561,19 @@
561561
"PREUPLOAD",
562562
"UPLOADED",
563563
"PROCESSING",
564+
"FAILED",
564565
"READY",
565-
"FAILED"
566+
"PRIVATE",
567+
"PUBLIC"
566568
],
567569
"x-enum-varnames": [
568570
"VideoStatusPREUPLOAD",
569571
"VideoStatusUPLOADED",
570572
"VideoStatusPROCESSING",
573+
"VideoStatusFAILED",
571574
"VideoStatusREADY",
572-
"VideoStatusFAILED"
575+
"VideoStatusPRIVATE",
576+
"VideoStatusPUBLIC"
573577
]
574578
},
575579
"pgtype.InfinityModifier": {

backend/swagger/swagger.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@ definitions:
2020
- PREUPLOAD
2121
- UPLOADED
2222
- PROCESSING
23-
- READY
2423
- FAILED
24+
- READY
25+
- PRIVATE
26+
- PUBLIC
2527
type: string
2628
x-enum-varnames:
2729
- VideoStatusPREUPLOAD
2830
- VideoStatusUPLOADED
2931
- VideoStatusPROCESSING
30-
- VideoStatusREADY
3132
- VideoStatusFAILED
33+
- VideoStatusREADY
34+
- VideoStatusPRIVATE
35+
- VideoStatusPUBLIC
3236
pgtype.InfinityModifier:
3337
enum:
3438
- 1

libs/db/models.go

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlc.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ sql:
66
- schema: "migration"
77
queries: "libs/db/queries"
88
engine: "postgresql"
9-
database:
10-
managed: true
9+
# database:
10+
# managed: true
1111
gen:
1212
go:
1313
package: "db"

transcoder/config/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"encoding/json"
5+
"strings"
56

67
"gitlab.com/subrotokumar/playstack/libs/core"
78
"gitlab.com/subrotokumar/playstack/libs/storage"
@@ -21,6 +22,11 @@ type Config struct {
2122
SecretAccessKey string `yaml:"secret_key" envconfig:"AWS_SECRET_ACCESS_KEY"`
2223
MediaBucket string `yaml:"media_bucket" envconfig:"MEDIA_BUCKET" required:"true"`
2324
} `yaml:"aws"`
25+
NotifierService struct {
26+
URL string `yaml:"api" envconfig:"NOTIFIER_SERVICE_ENDPOINT" default:"http://localhost:8080"`
27+
Username string `yaml:"username" envconfig:"BASIC_AUTH_USERNAME"`
28+
PASSWORD string `yaml:"password" envconfig:"BASIC_AUTH_PASSWORD"`
29+
} `yaml:"notifier_service"`
2430
Event string `yaml:"events" envconfig:"SQS_MESSAGE" required:"true"`
2531
S3Event storage.S3Event
2632
}
@@ -38,6 +44,11 @@ func (cfg *Config) Key() string {
3844
return cfg.S3Event.Records[0].S3.Object.Key
3945
}
4046

47+
func (cfg *Config) UserAndVideoID() (string, string) {
48+
keys := strings.Split(cfg.Key(), "/")
49+
return keys[1], keys[2]
50+
}
51+
4152
func (cfg *Config) ObjectSize() int64 {
4253
return cfg.S3Event.Records[0].S3.Object.Size
4354
}

transcoder/service/notifier.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,73 @@
11
package service
22

33
import (
4+
"bytes"
45
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
11+
"gitlab.com/subrotokumar/playstack/libs/db"
12+
)
13+
14+
type (
15+
UpdateMetadataRequest struct {
16+
Title *string `json:"title"`
17+
Status db.VideoStatus `json:"status" validate:"omitempty,oneof='PREUPLOAD' 'UPLOADED' 'PROCESSING' 'READY' 'FAILED'"`
18+
DurationSec *int32 `json:"duration_sec"`
19+
}
520
)
621

7-
func (s *Service) UpdateMetadata(ctx context.Context) error {
22+
func (s *Service) UpdateMetadata(ctx context.Context, request UpdateMetadataRequest) error {
823
s.log.Info("Updating video metadata in database")
24+
25+
userID, videoID := s.cfg.UserAndVideoID()
26+
27+
url := s.cfg.NotifierService.URL + "/internal/media/videos/" + videoID
28+
payload := make(map[string]any)
29+
payload["user_id"] = userID
30+
payload["status"] = string(request.Status)
31+
32+
if request.Title != nil {
33+
payload["title"] = *request.Title
34+
}
35+
if request.DurationSec != nil {
36+
payload["duration_sec"] = *request.DurationSec
37+
}
38+
39+
bodyBytes, err := json.Marshal(payload)
40+
if err != nil {
41+
s.log.Error("failed to marshal update payload", "err", err)
42+
return err
43+
}
44+
45+
req, err := http.NewRequestWithContext(ctx, "PATCH", url, bytes.NewReader(bodyBytes))
46+
if err != nil {
47+
s.log.Error("failed to build request", "err", err)
48+
return err
49+
}
50+
51+
username := s.cfg.NotifierService.Username
52+
password := s.cfg.NotifierService.PASSWORD
53+
req.Header.Set("Content-Type", "application/json")
54+
if username != "" || password != "" {
55+
req.SetBasicAuth(username, password)
56+
}
57+
58+
res, err := http.DefaultClient.Do(req)
59+
if err != nil {
60+
s.log.Error("notifier request failed", "err", err)
61+
return err
62+
}
63+
defer res.Body.Close()
64+
65+
respBody, _ := io.ReadAll(res.Body)
66+
if res.StatusCode < 200 || res.StatusCode >= 300 {
67+
s.log.Error("notifier returned non-2xx", "status", res.StatusCode, "body", string(respBody))
68+
return fmt.Errorf("notifier returned status: %s", res.Status)
69+
}
70+
71+
s.log.Info("Notifier updated metadata", "status", res.StatusCode)
972
return nil
1073
}

transcoder/service/worker.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@ import (
1111

1212
"github.com/aws/aws-sdk-go-v2/aws"
1313
"github.com/aws/aws-sdk-go-v2/service/s3"
14+
"gitlab.com/subrotokumar/playstack/libs/db"
1415
"gitlab.com/subrotokumar/playstack/transcoder/ffmpeg"
1516
)
1617

18+
const (
19+
MsgVideoMetadataUpdateFailed string = "failed to update video metadata"
20+
)
21+
1722
func (s *Service) Download(ctx context.Context, destPath string) error {
1823
s.log.Info("Downloading file", "path", destPath)
1924

@@ -64,10 +69,7 @@ func (s *Service) Transcode(ctx context.Context, inputPath, outputDir string) er
6469

6570
func (s *Service) Upload(ctx context.Context, sourceDir string) error {
6671
s.log.Info("Uploading files from", "dir", sourceDir)
67-
keys := strings.Split(s.cfg.Key(), "/")
68-
userId := keys[0]
69-
videoId := keys[1]
70-
uploadKey := userId + "/" + videoId + "/" + "output/"
72+
uploadKey := strings.ReplaceAll(s.cfg.Key(), "video.mp4", "output/")
7173
err := filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
7274
if err != nil {
7375
return err
@@ -89,7 +91,7 @@ func (s *Service) Upload(ctx context.Context, sourceDir string) error {
8991

9092
s.log.Info("Uploading", "key", uploadKey+relPath)
9193
_, err = s.storage.Client().PutObject(ctx, &s3.PutObjectInput{
92-
Bucket: aws.String(s.cfg.Bucket()),
94+
Bucket: aws.String(s.cfg.Aws.MediaBucket),
9395
Key: aws.String(uploadKey + relPath),
9496
Body: file,
9597
})
@@ -107,6 +109,10 @@ func (s *Service) Upload(ctx context.Context, sourceDir string) error {
107109
}
108110

109111
func (s *Service) Process(ctx context.Context) error {
112+
if err := s.UpdateMetadata(ctx, UpdateMetadataRequest{Status: db.VideoStatusUPLOADED}); err != nil {
113+
s.log.Error(MsgVideoMetadataUpdateFailed, "err", err.Error())
114+
}
115+
110116
workDir := "./tmp/workspace"
111117
if err := os.MkdirAll(workDir, 0o755); err != nil {
112118
return fmt.Errorf("create work dir: %w", err)
@@ -131,15 +137,22 @@ func (s *Service) Process(ctx context.Context) error {
131137
return fmt.Errorf("download video: %w", err)
132138
}
133139

140+
if err := s.UpdateMetadata(ctx, UpdateMetadataRequest{Status: db.VideoStatusPROCESSING}); err != nil {
141+
s.log.Error(MsgVideoMetadataUpdateFailed, "err", err.Error())
142+
}
143+
134144
if err := s.Transcode(ctx, inputPath, outputPath); err != nil {
145+
s.UpdateMetadata(ctx, UpdateMetadataRequest{Status: db.VideoStatusFAILED})
135146
return fmt.Errorf("transcode video: %w", err)
136147
}
137148

138149
if err := s.Upload(ctx, outputPath); err != nil {
150+
s.UpdateMetadata(ctx, UpdateMetadataRequest{Status: db.VideoStatusFAILED})
139151
return fmt.Errorf("upload files: %w", err)
140152
}
141-
if err := s.UpdateMetadata(ctx); err != nil {
142-
return fmt.Errorf("update video: %w", err)
153+
if err := s.UpdateMetadata(ctx, UpdateMetadataRequest{Status: db.VideoStatusREADY}); err != nil {
154+
s.log.Error(MsgVideoMetadataUpdateFailed, "err", err.Error())
155+
return err
143156
}
144157
return nil
145158
}

0 commit comments

Comments
 (0)