Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dev-tools/compose/docker-compose.ddex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
environment:
- DDEX_PORT=9000
- DDEX_MONGODB_URL=mongodb://mongo:mongo@ddex-mongo:27017/ddex?authSource=admin&replicaSet=rs0
env_file: .env
ports:
- "9000:9000"
networks:
Expand All @@ -27,6 +28,7 @@ services:
dockerfile: ${PROJECT_ROOT}/packages/ddex/ingester/Dockerfile
environment:
- DDEX_MONGODB_URL=mongodb://mongo:mongo@ddex-mongo:27017/ddex?authSource=admin&replicaSet=rs0
env_file: .env
depends_on:
ddex-mongo:
condition: service_healthy
Expand All @@ -43,6 +45,7 @@ services:
dockerfile: ${PROJECT_ROOT}/packages/ddex/ingester/Dockerfile
environment:
- DDEX_MONGODB_URL=mongodb://mongo:mongo@ddex-mongo:27017/ddex?authSource=admin&replicaSet=rs0
env_file: .env
depends_on:
ddex-mongo:
condition: service_healthy
Expand All @@ -59,6 +62,7 @@ services:
dockerfile: ${PROJECT_ROOT}/packages/ddex/ingester/Dockerfile
environment:
- DDEX_MONGODB_URL=mongodb://mongo:mongo@ddex-mongo:27017/ddex?authSource=admin&replicaSet=rs0
env_file: .env
depends_on:
ddex-mongo:
condition: service_healthy
Expand All @@ -79,6 +83,7 @@ services:
TURBO_TOKEN: '${TURBO_TOKEN}'
environment:
- DDEX_MONGODB_URL=mongodb://mongo:mongo@ddex-mongo:27017/ddex?authSource=admin&replicaSet=rs0
env_file: .env
depends_on:
ddex-mongo:
condition: service_healthy
Expand Down
2 changes: 2 additions & 0 deletions packages/ddex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ To use dev envs: `cp .env.dev .env`

Fill in all missing values. See the `Creating a bucket in S3` section below for how to set up S3.

For docker compose to work: (at monorepo root) `cat packages/ddex/.env >> dev-tools/compose/.env`

### Setup
1. (At the monorepo root) Generate a keyfile for mongodb:
```
Expand Down
43 changes: 43 additions & 0 deletions packages/ddex/ingester/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package common

import (
"time"

"go.mongodb.org/mongo-driver/bson/primitive"
)

type Upload struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
Path string `bson:"path"`
CreatedAt time.Time `bson:"created_at"`
}

type Indexed struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID string `bson:"delivery_id"`
DeliveryStatus string `bson:"delivery_status"`
XmlFilePath string `bson:"xml_file_path"`
XmlContent primitive.Binary `bson:"xml_content"`
CreatedAt time.Time `bson:"created_at"`
}

type Parsed struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID string `bson:"delivery_id"`
Entity string `bson:"entity"`
PublishDate time.Time `bson:"publish_date"`
CreatedAt time.Time `bson:"created_at"`
}

type Published struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID string `bson:"delivery_id"`
Entity string `bson:"entity"`
PublishDate time.Time `bson:"publish_date"`
EntityID string `bson:"entity_id"`
CreatedAt time.Time `bson:"created_at"`
}
1 change: 1 addition & 0 deletions packages/ddex/ingester/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ const (
DeliveryStatusRejected = "rejected"
DeliveryStatusValidating = "validating"
DeliveryStatusAwaitingPublishing = "awaiting_publishing"
DeliveryStatusPublished = "published"
)
2 changes: 1 addition & 1 deletion packages/ddex/ingester/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func persistUploads(client *mongo.Client, bucket string, uploads []*s3.Object, c
etag := strings.Trim(*upload.ETag, "\"")
// Only insert if a document doesn't already exist with this path and etag
filter := bson.M{"path": path, "upload_etag": etag}
update := bson.M{"$setOnInsert": bson.M{"path": path, "upload_etag": etag}}
update := bson.M{"$setOnInsert": bson.M{"path": path, "upload_etag": etag, "created_at": upload.LastModified}}
opts := options.Update().SetUpsert(true)
_, err := uploadsColl.UpdateOne(ctx, filter, update, opts)
if err != nil {
Expand Down
23 changes: 14 additions & 9 deletions packages/ddex/ingester/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"context"
"fmt"
"ingester/common"
"ingester/constants"
"io"
"log"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
Expand Down Expand Up @@ -71,16 +73,17 @@ func RunNewIndexer(ctx context.Context) {
// processZIP unzips an "upload" into a "delivery" (or multiple deliveries if the ZIP file contains multiple folders with XML files).
func (i *Indexer) processZIP(changeStream *mongo.ChangeStream) {
// Decode the "upload" from Mongo
var changeDoc bson.M
var changeDoc struct {
FullDocument common.Upload `bson:"fullDocument"`
}
if err := changeStream.Decode(&changeDoc); err != nil {
log.Fatal(err)
}
fullDocument, _ := changeDoc["fullDocument"].(bson.M)
i.logger.Info("Indexer: Processing new upload", "upload", fullDocument)
i.logger.Info("Indexer: Processing new upload", "upload", changeDoc.FullDocument)

// Download ZIP file from S3
uploadETag := fullDocument["upload_etag"].(string)
remotePath := fullDocument["path"].(string)
uploadETag := changeDoc.FullDocument.UploadETag
remotePath := changeDoc.FullDocument.Path
zipFilePath, cleanup := i.downloadFromS3Raw(remotePath)
defer cleanup()
if zipFilePath == "" {
Expand Down Expand Up @@ -169,10 +172,12 @@ func (i *Indexer) processDelivery(rootDir, dir, uploadETag string) error {

// Insert the delivery into the Mongo "indexed" collection
deliveryDoc := bson.M{
"delivery_id": deliveryID,
"upload_etag": uploadETag,
"xml_file_path": xmlRelativePath,
"xml_content": primitive.Binary{Data: xmlBytes, Subtype: 0x00}, // Store directly as generic binary for high data integrity
"upload_etag": uploadETag,
"delivery_id": deliveryID,
"delivery_status": constants.DeliveryStatusValidating,
"xml_file_path": xmlRelativePath,
"xml_content": primitive.Binary{Data: xmlBytes, Subtype: 0x00}, // Store directly as generic binary for high data integrity
"created_at": time.Now(),
}
if _, err := i.indexedColl.InsertOne(i.ctx, deliveryDoc); err != nil {
return fmt.Errorf("failed to insert XML data into Mongo: %w", err)
Expand Down
22 changes: 12 additions & 10 deletions packages/ddex/ingester/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ func Run(ctx context.Context) {
defer changeStream.Close(ctx)

for changeStream.Next(ctx) {
var changeDoc bson.M
var changeDoc struct {
FullDocument common.Indexed `bson:"fullDocument"`
}
if err := changeStream.Decode(&changeDoc); err != nil {
log.Fatal(err)
}
fullDocument, _ := changeDoc["fullDocument"].(bson.M)
parseIndexed(mongoClient, fullDocument, ctx)
parseIndexed(mongoClient, changeDoc.FullDocument, ctx)
}

if err := changeStream.Err(); err != nil {
log.Fatal(err)
}
}

func parseIndexed(client *mongo.Client, fullDocument bson.M, ctx context.Context) {
log.Printf("Processing new indexed document: %v\n", fullDocument)
func parseIndexed(client *mongo.Client, indexed common.Indexed, ctx context.Context) {
log.Printf("Processing new indexed document: %v\n", indexed)
indexedColl := client.Database("ddex").Collection("indexed")
parsedColl := client.Database("ddex").Collection("parsed")

Expand All @@ -49,7 +50,7 @@ func parseIndexed(client *mongo.Client, fullDocument bson.M, ctx context.Context

session, err := client.StartSession()
if err != nil {
failAndUpdateStatus(err, indexedColl, ctx, fullDocument["_id"].(primitive.ObjectID))
failAndUpdateStatus(err, indexedColl, ctx, indexed.ID)
}
err = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error {
if err := session.StartTransaction(); err != nil {
Expand All @@ -58,10 +59,11 @@ func parseIndexed(client *mongo.Client, fullDocument bson.M, ctx context.Context

// 2. Write each release in "delivery_xml" in the indexed doc as a bson doc in the 'parsed' collection
parsedDoc := bson.M{
"upload_etag": fullDocument["upload_etag"],
"delivery_id": fullDocument["delivery_id"],
"upload_etag": indexed.UploadETag,
"delivery_id": indexed.DeliveryID,
"entity": "track",
"publish_date": time.Now(),
"created_at": time.Now(),
}
result, err := parsedColl.InsertOne(ctx, parsedDoc)
if err != nil {
Expand All @@ -71,7 +73,7 @@ func parseIndexed(client *mongo.Client, fullDocument bson.M, ctx context.Context
log.Println("New parsed release doc ID: ", result.InsertedID)

// 3. Set delivery status for delivery in 'indexed' collection
err = setDeliveryStatus(indexedColl, sessionContext, fullDocument["_id"].(primitive.ObjectID), constants.DeliveryStatusAwaitingPublishing)
err = setDeliveryStatus(indexedColl, sessionContext, indexed.ID, constants.DeliveryStatusAwaitingPublishing)
if err != nil {
session.AbortTransaction(sessionContext)
return err
Expand All @@ -81,7 +83,7 @@ func parseIndexed(client *mongo.Client, fullDocument bson.M, ctx context.Context
})

if err != nil {
failAndUpdateStatus(err, indexedColl, ctx, fullDocument["_id"].(primitive.ObjectID))
failAndUpdateStatus(err, indexedColl, ctx, indexed.ID)
}

session.EndSession(ctx)
Expand Down
2 changes: 1 addition & 1 deletion packages/ddex/publisher/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import dotenv from 'dotenv'
import path from 'path'

// Load env vars from ddex package root
dotenv.config({ path: path.join(__dirname, '..', '..', '..', '.env') })
dotenv.config({ path: path.join(__dirname, '..', '..', '.env') })

import createApp from './app'
import { dialDb } from './services/dbService'
Expand Down
16 changes: 16 additions & 0 deletions packages/ddex/publisher/src/models/indexed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import mongoose from 'mongoose'

// DDEX deliveries indexed from DDEX uploads
const indexedSchema = new mongoose.Schema({
id: mongoose.Schema.Types.ObjectId,
upload_etag: String,
delivery_id: String,
delivery_status: String,
xml_file_path: String,
xml_content: Buffer,
created_at: Date,
})

const Indexed = mongoose.model('Indexed', indexedSchema, 'indexed')

export default Indexed
3 changes: 3 additions & 0 deletions packages/ddex/publisher/src/models/parsed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import mongoose from 'mongoose'

// Releases that parsed from indexed DDEX deliveries, awaiting publishing
const parsedSchema = new mongoose.Schema({
id: mongoose.Schema.Types.ObjectId,
upload_etag: String,
delivery_id: String,
entity: String,
publish_date: Date,
created_at: Date,
})

const Parsed = mongoose.model('Parsed', parsedSchema, 'parsed')
Expand Down
5 changes: 4 additions & 1 deletion packages/ddex/publisher/src/models/published.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import mongoose from 'mongoose'

// DDEX releases that have been published
const publishedSchema = new mongoose.Schema({
id: mongoose.Schema.Types.ObjectId,
upload_etag: String,
delivery_id: String,
entity: String,
publish_date: Date,
track_id: String,
entity_id: String,
created_at: Date,
})

const Published = mongoose.model('Published', publishedSchema, 'published')
Expand Down
20 changes: 18 additions & 2 deletions packages/ddex/publisher/src/services/publisherService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import mongoose from 'mongoose'
import Indexed from '../models/indexed'
import Parsed from '../models/parsed'
import Published from '../models/published'

Expand All @@ -16,17 +17,32 @@ export const publishReleases = async () => {
}).session(session)

for (const doc of documents) {
// TODO download audio/image files from "indexed" s3 bucket
// TODO publish release using SDK

// Move document to 'published' collection
const publishedData = {
...doc.toObject(),
track_id: 'todo',
entity_id: 'todo',
created_at: new Date(),
}
const publishedDoc = new Published(publishedData)
await publishedDoc.save({ session })
await Parsed.deleteOne({ _id: doc._id }).session(session)
// TODO update indexed delivery_status to 'published'
// Update delivery_status to 'published' once all releases in the delivery are published
const remainingCount = await Parsed.countDocuments({
delivery_id: doc.delivery_id,
_id: { $ne: doc._id },
}).session(session)

if (remainingCount === 0) {
// Update delivery_status in indexed collection
await Indexed.updateOne(
{ delivery_id: doc.delivery_id },
{ $set: { delivery_status: 'published' } },
{ session }
)
}
console.log('Published release: ', publishedData)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@
}

.styledTable tbody tr:nth-of-type(even) {
/* background-color: #f3f3f3; */
background-color: var(--harmony-background);
}

.styledTable tbody tr:last-of-type {
border-bottom: 2px solid var(--harmony-secondary);
}

.styledTable tbody tr.activeRow {
font-weight: bold;
color: var(--harmony-secondary);
.styledTable tbody td.statusFailed {
background-color: #ffccbc;
}

.styledTable tbody td.statusPending {
background-color: #fff59d;
}

.styledTable tbody td.statusSuccess {
background-color: #c8e6c9;
}
Loading