From 7c2b11c5682978b2c9158f5fce290d69c6bad35f Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Fri, 26 Apr 2024 14:23:04 -0700 Subject: [PATCH 1/5] add updates deletes. TODO parse PurgeReleaseMessage, accept updates with no resource files if they were previously sent --- packages/ddex/ingester/common/types.go | 3 +- packages/ddex/ingester/constants/constants.go | 20 +- packages/ddex/ingester/parser/ern38x.go | 32 +- packages/ddex/ingester/parser/parser.go | 27 +- .../ddex/publisher/src/models/releases.ts | 88 +---- .../src/services/publisherService.ts | 359 ++++++++++++++---- 6 files changed, 365 insertions(+), 164 deletions(-) diff --git a/packages/ddex/ingester/common/types.go b/packages/ddex/ingester/common/types.go index 0c254f0fc92..2b0010e0569 100644 --- a/packages/ddex/ingester/common/types.go +++ b/packages/ddex/ingester/common/types.go @@ -27,7 +27,8 @@ type Release struct { PublishErrors []string `bson:"publish_errors"` FailureCount int `bson:"failure_count"` ReleaseStatus string `bson:"release_status"` - CreatedAt time.Time `bson:"created_at"` + IsUpdate bool `bson:"is_update"` + LastParsed time.Time `bson:"last_parsed"` // Parsed from the release's XML ReleaseProfile ReleaseProfile `bson:"release_profile"` // "ReleaseProfileVersionId" from the DDEX XML diff --git a/packages/ddex/ingester/constants/constants.go b/packages/ddex/ingester/constants/constants.go index baa0112a6d8..376f252c33e 100644 --- a/packages/ddex/ingester/constants/constants.go +++ b/packages/ddex/ingester/constants/constants.go @@ -8,14 +8,18 @@ const ( ) const ( - ReleaseStatusAwaitingParse = "awaiting_parse" // The release is waiting to be published - ReleaseStatusAwaitingPublish = "awaiting_publish" // The release is waiting to be uploaded to Audius - ReleaseStatusErrorUserMatch = "error_user_match" // The release didn't have a user that matched with an OAuthed Audius user - ReleaseStatusErrorGenreMatch = "error_genre_match" // The release didn't have a genre that matched with an Audius genre - ReleaseStatusErrorParsing = "error_parsing" // Some other error occurred during parsing. See ParseErrors - ReleaseStatusErrorDuringUpload = "error_during_upload" // An error occurred while trying to publish to Audius - ReleaseStatusErrorAfterUpload = "error_after_upload" // The release was published to Audius, but there was an error after publishing - ReleaseStatusPublished = "published" // The release was successfully published to Audius + ReleaseStatusAwaitingParse = "awaiting_parse" // The release is waiting to be published + ReleaseStatusAwaitingPublish = "awaiting_publish" // The release is waiting to be uploaded to Audius + ReleaseStatusErrorUserMatch = "error_user_match" // The release didn't have a user that matched with an OAuthed Audius user + ReleaseStatusErrorGenreMatch = "error_genre_match" // The release didn't have a genre that matched with an Audius genre + ReleaseStatusErrorParsing = "error_parsing" // Some other error occurred during parsing. See ParseErrors + ReleaseStatusFailedDuringUpload = "failed_during_upload" // An error occurred while trying to publish to Audius + ReleaseStatusFailedAfterUpload = "failed_after_upload" // The release was published to Audius, but there was an error after publishing + ReleaseStatusPublished = "published" // The release was successfully published to Audius + ReleaseStatusAwaitingDelete = "awaiting_delete" // The release is waiting to be removed from Audius + ReleaseStatusDeleted = "deleted" // The release was successfully removed from Audius + ReleaseStatusFailedDuringDelete = "failed_during_delete" // An error occurred while trying to delete from Audius + ReleaseStatusFailedAfterDelete = "failed_after_delete" // The release was removed from Audius, but there was an error after deletion ) var SkipFiles = []string{".DS_STORE", "__MACOSX"} diff --git a/packages/ddex/ingester/parser/ern38x.go b/packages/ddex/ingester/parser/ern38x.go index dbc83b27822..e63eed4cbad 100644 --- a/packages/ddex/ingester/parser/ern38x.go +++ b/packages/ddex/ingester/parser/ern38x.go @@ -1,8 +1,10 @@ package parser import ( + "context" "fmt" "ingester/common" + "ingester/constants" "regexp" "slices" "sort" @@ -10,6 +12,8 @@ import ( "time" "github.com/antchfx/xmlquery" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" ) // SoundRecording represents the parsed details of a sound recording @@ -105,7 +109,7 @@ type ResourceGroupContentItem struct { // parseERN38x parses the given XML data and returns a release ready to be uploaded to Audius. // NOTE: This expects the ERN 3 format. See https://kb.ddex.net/implementing-each-standard/electronic-release-notification-message-suite-(ern)/ern-3-explained/ -func parseERN38x(doc *xmlquery.Node, crawledBucket string, release *common.Release) (errs []error) { +func parseERN38x(doc *xmlquery.Node, crawledBucket string, release *common.Release, releasesColl *mongo.Collection) (errs []error) { var ( soundRecordings []SoundRecording images []Image @@ -173,8 +177,30 @@ func parseERN38x(doc *xmlquery.Node, crawledBucket string, release *common.Relea // Parse s from dealNodes := xmlquery.Find(doc, "//DealList/ReleaseDeal") if len(dealNodes) == 0 { - errs = append(errs, fmt.Errorf("no found")) - return + // Check for an existing release to determine whether this is a takedown request or an invalid NewReleaseMessage + var existingRelease common.Release + filter := bson.M{"_id": release.ReleaseID} + err = releasesColl.FindOne(context.Background(), filter).Decode(&existingRelease) + if err == nil { + // This is a takedown request. Mark the existing release for deletion + switch existingRelease.ReleaseStatus { + case constants.ReleaseStatusPublished, constants.ReleaseStatusFailedAfterUpload, constants.ReleaseStatusFailedDuringDelete: + // Has been published to Audius. Mark for deletion by the publisher + release.ReleaseStatus = constants.ReleaseStatusAwaitingDelete + release.EntityID = existingRelease.EntityID + release.SDKUploadMetadata = existingRelease.SDKUploadMetadata + default: + // Has not yet been published to Audius. Mark as deleted + release.ReleaseStatus = constants.ReleaseStatusDeleted + } + } else if err != nil && err != mongo.ErrNoDocuments { + errs = append(errs, err) + return + } else { + // This is a NewReleaseMessage that should have a deal + errs = append(errs, fmt.Errorf("no found")) + return + } } for _, dNode := range dealNodes { err := processDealNode(dNode, release) diff --git a/packages/ddex/ingester/parser/parser.go b/packages/ddex/ingester/parser/parser.go index 794f4b8f4f9..593afb0fde5 100644 --- a/packages/ddex/ingester/parser/parser.go +++ b/packages/ddex/ingester/parser/parser.go @@ -11,6 +11,7 @@ import ( "time" "github.com/antchfx/xmlquery" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -37,6 +38,18 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { // Upsert the release after parsing regardless of success or failure defer func() { + filter := bson.M{"_id": release.ReleaseID} + var existingRelease common.Release + + // Find the existing document, if any + err := p.ReleasesColl.FindOne(p.Ctx, filter).Decode(&existingRelease) + if err == nil { + if existingRelease.ReleaseStatus == constants.ReleaseStatusPublished || existingRelease.ReleaseStatus == constants.ReleaseStatusFailedAfterUpload { + // Set is_update to true if the existing release has already been published to Audius + release.IsUpdate = true + } + } + if res, err := p.UpsertRelease(release); err == nil { p.Logger.Info("Upserted release", "_id", res.UpsertedID, "modified", res.ModifiedCount) ok = true @@ -84,7 +97,7 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { // Use local-name() to ignore namespace because sometimes it's "ern" and sometimes it's "ernm" msgVersionElem := xmlquery.FindOne(doc, "//*[local-name()='NewReleaseMessage']") if msgVersionElem == nil { - logParsingErr(fmt.Errorf("missing element")) + logParsingErr(fmt.Errorf("missing or element")) return } @@ -117,9 +130,9 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { switch ernVersion { // Not sure what the difference is between 3.81 and 3.82 because DDEX only provides the most recent version and 1 version behind unless you contact them case "381": - errs = parseERN38x(doc, p.Bucket, release) + errs = parseERN38x(doc, p.Bucket, release, p.ReleasesColl) case "382": - errs = parseERN38x(doc, p.Bucket, release) + errs = parseERN38x(doc, p.Bucket, release, p.ReleasesColl) default: logParsingErr(fmt.Errorf("unsupported schema: '%s'. Expected ern/381 or ern/382", msgSchemaVersionId)) return @@ -131,7 +144,13 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { } return } - p.Logger.Info("Parsed release", "id", release.ReleaseID) + + if release.ReleaseStatus == constants.ReleaseStatusDeleted || release.ReleaseStatus == constants.ReleaseStatusAwaitingDelete { + p.Logger.Info("Parsed takedown request", "id", release.ReleaseID) + return + } else { + p.Logger.Info("Parsed release", "id", release.ReleaseID) + } // Find an ID for the first OAuthed display artist in the release for i, parsedRelease := range release.ParsedReleaseElems { diff --git a/packages/ddex/publisher/src/models/releases.ts b/packages/ddex/publisher/src/models/releases.ts index 85f6ff6975e..47977b06b76 100644 --- a/packages/ddex/publisher/src/models/releases.ts +++ b/packages/ddex/publisher/src/models/releases.ts @@ -79,14 +79,20 @@ const moods = [ 'Upbeat', 'Other', ] +// TODO michelle simplify this between the go code and here export const releaseStatuses = [ 'awaiting_parse', 'awaiting_publish', 'error_user_match', 'error_genre_match', + 'error_parsing', 'failed_during_upload', 'failed_after_upload', 'published', + 'awaiting_delete', + 'deleted', + 'failed_during_delete', + 'failed_after_delete', ] as const interface ResourceContributor { @@ -168,12 +174,6 @@ const trackMetadataSchema = new mongoose.Schema({ export type TrackMetadata = mongoose.InferSchemaType -const imageMetadataSchema = new mongoose.Schema({ - url: String, - url_hash: String, - url_hash_algo: String, -}) - const nullableString = { type: String, default: null, @@ -189,23 +189,6 @@ const nullableInt = { default: null, } -const releaseIDsSchema = new mongoose.Schema({ - party_id: { type: String, default: '' }, - catalog_number: { type: String, default: '' }, - icpn: { type: String, default: '' }, - grid: { type: String, default: '' }, - isan: { type: String, default: '' }, - isbn: { type: String, default: '' }, - ismn: { type: String, default: '' }, - isrc: { type: String, default: '' }, - issn: { type: String, default: '' }, - istc: { type: String, default: '' }, - iswc: { type: String, default: '' }, - mwli: { type: String, default: '' }, - sici: { type: String, default: '' }, - proprietary_id: { type: String, default: '' }, -}) - export const sdkUploadMetadataSchema = new mongoose.Schema( { // Used by both tracks and albums @@ -213,7 +196,7 @@ export const sdkUploadMetadataSchema = new mongoose.Schema( genre: { type: String, required: true, enum: genres }, artists: { type: [resourceContributorSchema], default: null }, description: nullableString, - ddex_release_ids: releaseIDsSchema, + ddex_release_ids: mongoose.Schema.Types.Mixed, mood: { type: String, enum: moods, default: null }, tags: nullableString, copyright_line: copyrightSchema, @@ -264,58 +247,6 @@ export type SDKUploadMetadataSchema = mongoose.InferSchemaType< typeof sdkUploadMetadataSchema > -const releaseResourcesSchema = new mongoose.Schema({ - tracks: [trackMetadataSchema], - images: [imageMetadataSchema], -}) - -const parsedReleaseElementSchema = new mongoose.Schema( - { - release_ref: { type: String, required: true }, - is_main_release: { type: Boolean, required: true }, - release_type: { - type: String, - required: true, - enum: ['Album', 'EP', 'TrackRelease', 'Single'], - }, - release_ids: releaseIDsSchema, - release_date: { type: Date, required: true }, - validity_start_date: { type: Date, required: true }, - resources: releaseResourcesSchema, - artist_id: { type: String, required: true }, - artist_name: { type: String, required: true }, - display_title: { type: String, required: true }, - display_subtitle: nullableString, - reference_title: nullableString, - reference_subtitle: nullableString, - formal_title: nullableString, - formal_subtitle: nullableString, - genre: { type: String, enum: genres, required: true }, - duration: { type: Number, required: true }, - preview_start_seconds: nullableInt, - isrc: nullableString, - artists: { type: [resourceContributorSchema], default: null }, - resource_contributors: { type: [resourceContributorSchema], default: null }, - indirect_resource_contributors: { - type: [resourceContributorSchema], - default: null, - }, - rights_controller: { type: rightsControllerSchema, default: null }, - copyright_line: { type: copyrightSchema, default: null }, - producer_copyright_line: { type: copyrightSchema, default: null }, - parental_warning_type: nullableString, - is_stream_gated: { type: Boolean, default: false }, - stream_conditions: { type: mongoose.Schema.Types.Mixed, default: null }, - is_download_gated: { type: Boolean, default: false }, - download_conditions: { type: mongoose.Schema.Types.Mixed, default: null }, - is_stream_follow_gated: { type: Boolean, default: false }, - is_stream_tip_gated: { type: Boolean, default: false }, - is_download_follow_gated: { type: Boolean, default: false }, - has_deal: { type: Boolean, default: false }, - }, - { _id: false } -) // _id is set to false because this schema is used as a sub-document - const releasesSchema = new mongoose.Schema({ _id: { type: String, required: true }, delivery_remote_path: { type: String, required: true }, @@ -328,9 +259,9 @@ const releasesSchema = new mongoose.Schema({ 'Unspecified', ], }, - parsed_release_elems: [parsedReleaseElementSchema], + parsed_release_elems: mongoose.Schema.Types.Mixed, sdk_upload_metadata: sdkUploadMetadataSchema, - created_at: { type: Date, required: true }, + last_parsed: { type: Date, required: true }, parse_errors: [String], publish_errors: [String], failure_count: { type: Number, default: 0 }, @@ -338,6 +269,7 @@ const releasesSchema = new mongoose.Schema({ type: String, enum: releaseStatuses, }, + is_update: { type: Boolean, default: false }, // Only set if the release was successfully published entity_id: String, diff --git a/packages/ddex/publisher/src/services/publisherService.ts b/packages/ddex/publisher/src/services/publisherService.ts index 3ad701f7cad..8ffd38b63dc 100644 --- a/packages/ddex/publisher/src/services/publisherService.ts +++ b/packages/ddex/publisher/src/services/publisherService.ts @@ -7,7 +7,11 @@ import type { import type { AudiusSdk as AudiusSdkType, UploadTrackRequest, + UpdateTrackRequest, + DeleteTrackRequest, UploadAlbumRequest, + UpdateAlbumRequest, + DeleteAlbumRequest, Genre, Mood, } from '@audius/sdk' @@ -110,6 +114,56 @@ const uploadTrack = async ( return result } +const updateTrack = async ( + audiusSdk: AudiusSdkType, + trackId: string, + pendingTrack: SDKUploadMetadataSchema, + s3Service: ReturnType +) => { + const userId = pendingTrack.artist_id + const metadata = formatTrackMetadata(pendingTrack) + + const coverArtDownload = await s3Service.downloadFromS3Crawled( + pendingTrack.cover_art_url + ) + // TODO: We can hash and verify against the metadata here + const coverArtFile = { + buffer: coverArtDownload!, + originalname: pendingTrack.cover_art_url.split('/').pop(), + } + + const updateTrackRequest: UpdateTrackRequest = { + trackId, + userId, + coverArtFile, + metadata, + onProgress: (progress: any) => console.log('Progress:', progress), + } + console.log( + `Updating ${pendingTrack.title} by ${pendingTrack.artist_id} (track id ${trackId})...` + ) + const result = await audiusSdk.tracks.updateTrack(updateTrackRequest) + console.log(result) + return result +} + +const deleteTrack = async ( + audiusSdk: AudiusSdkType, + trackId: string, + trackMetadata: SDKUploadMetadataSchema +) => { + const deleteTrackRequest: DeleteTrackRequest = { + trackId, + userId: trackMetadata.artist_id, + } + console.log( + `Deleting ${trackMetadata.title} by ${trackMetadata.artist_id} (track id ${trackId})...` + ) + const result = await audiusSdk.tracks.deleteTrack(deleteTrackRequest) + console.log(result) + return result +} + const uploadAlbum = async ( audiusSdk: AudiusSdkType, pendingAlbum: SDKUploadMetadataSchema, @@ -158,6 +212,55 @@ const uploadAlbum = async ( return result } +const updateAlbum = async ( + audiusSdk: AudiusSdkType, + albumId: string, + pendingAlbum: SDKUploadMetadataSchema, + s3Service: ReturnType +) => { + // Fetch cover art from S3 + const coverArtDownload = await s3Service.downloadFromS3Crawled( + pendingAlbum.cover_art_url + ) + // TODO: We can hash and verify against the metadata here + const coverArtFile = { + buffer: coverArtDownload!, + originalname: pendingAlbum.cover_art_url.split('/').pop(), + } + + // TODO we should add support for updating the album tracks' metadatas in the SDK as part of this update + const updateAlbumRequest: UpdateAlbumRequest = { + albumId, + userId: pendingAlbum.playlist_owner_id, + coverArtFile, + metadata: formatAlbumMetadata(pendingAlbum), + onProgress: (progress: any) => console.log('Progress:', progress), + } + console.log( + `Updating ${pendingAlbum.playlist_name} by ${pendingAlbum.playlist_owner_id} (album id ${albumId})...` + ) + const result = await audiusSdk.albums.uploadAlbum(updateAlbumRequest) + console.log(result) + return result +} + +const deleteAlbum = async ( + audiusSdk: AudiusSdkType, + albumId: string, + albumMetadata: SDKUploadMetadataSchema +) => { + const deleteAlbumRequest: DeleteAlbumRequest = { + albumId, + userId: albumMetadata.playlist_owner_id, + } + console.log( + `Deleting ${albumMetadata.playlist_name} by ${albumMetadata.playlist_owner_id} (album id ${albumId})...` + ) + const result = await audiusSdk.albums.deleteAlbum(deleteAlbumRequest) + console.log(result) + return result +} + async function recordPendingReleaseErr( doc: Release, error: any, @@ -187,94 +290,123 @@ async function recordPendingReleaseErr( } } -export const publishReleases = async ( +const publishReleases = async ( audiusSdk: AudiusSdkType, s3: ReturnType ) => { - // eslint-disable-next-line no-constant-condition - while (true) { - let documents - try { - const currentDate = new Date() - documents = await Releases.aggregate([ - { - $match: { - $and: [ - { - $expr: { - $lte: [ - { - $max: [ - '$sdk_upload_metadata.release_date', - '$sdk_upload_metadata.validity_start_date', - ], - }, - currentDate, - ], - }, + let documents + try { + const currentDate = new Date() + documents = await Releases.aggregate([ + { + $match: { + $and: [ + { + $expr: { + $lte: [ + { + $max: [ + '$sdk_upload_metadata.release_date', + '$sdk_upload_metadata.validity_start_date', + ], + }, + currentDate, + ], }, - { release_status: 'awaiting_publish' }, - ], - }, + }, + { release_status: 'awaiting_publish' }, + ], }, - ]) - } catch (error) { - console.error('Failed to fetch pending releases:', error) - await new Promise((resolve) => setTimeout(resolve, 10_000)) - continue - } + }, + ]) + } catch (error) { + console.error('Failed to fetch pending releases:', error) + return + } - for (const doc of documents) { - const releaseId = doc._id + for (const doc of documents) { + const releaseId = doc._id - if (doc.failed_after_upload) { - console.error( - `releases doc with ID ${releaseId} requires manual intervention because it's already uploaded to Audius but failed to update in Mongo.` - ) - continue - } else if (doc.failure_count > 3) { - console.error( - `pending_releases doc with ID ${releaseId} requires manual intervention because we've already retried it 3 times.` - ) - continue - } + if (doc.release_status == 'failed_after_upload') { + console.error( + `releases doc with ID ${releaseId} requires manual intervention because it's already uploaded to Audius but failed to update in Mongo.` + ) + continue + } else if (doc.failure_count > 3) { + console.error( + `Tried to publish release with ID ${releaseId} 3 times. Requires manual intervention` + ) + continue + } - let uploadResult: { - trackId?: string | null - albumId?: string | null - blockHash: string - blockNumber: number + let uploadResult: { + trackId?: string | null + albumId?: string | null + blockHash: string + blockNumber: number + } + let updateResult: { + blockHash: string + blockNumber: number + } + try { + if (doc.is_update && !doc.entity_id) { + throw new Error('Missing entity id in pending update release') } - try { - if (doc?.sdk_upload_metadata?.title) { + if (doc.sdk_upload_metadata?.title) { + if (doc.is_update) { + updateResult = await updateTrack( + audiusSdk, + doc.entity_id, + doc.sdk_upload_metadata, + s3 + ) + } else { uploadResult = await uploadTrack( audiusSdk, doc.sdk_upload_metadata, s3 ) - } else if (doc?.sdk_upload_metadata?.playlist_name) { - uploadResult = await uploadAlbum( + } + } else if (doc.sdk_upload_metadata?.playlist_name) { + if (doc.is_update) { + updateResult = await updateAlbum( audiusSdk, + doc.entity_id, doc.sdk_upload_metadata, s3 ) } else { - recordPendingReleaseErr( - doc, - 'Missing track or album in pending release', - 'failed_during_upload' + uploadResult = await uploadAlbum( + audiusSdk, + doc.sdk_upload_metadata, + s3 ) - continue } - } catch (error) { - recordPendingReleaseErr(doc, error, 'failed_during_upload') - continue + } else { + throw new Error('Missing track or album in pending release') } + } catch (error) { + recordPendingReleaseErr(doc, error, 'failed_during_upload') + continue + } - console.log('Published release:', releaseId) + console.log('Published release:', releaseId) - // Mark release as published in Mongo - try { + // Mark release as published in Mongo + try { + if (updateResult) { + await Releases.updateOne( + { _id: doc._id }, + { + $set: { + blockhash: updateResult.blockHash, + blocknumber: updateResult.blockNumber, + release_status: 'published', + }, + } + ) + } else { await Releases.updateOne( { _id: doc._id }, { @@ -288,16 +420,103 @@ export const publishReleases = async ( }, } ) - } catch (updateError) { - console.error( - 'Failed to update release doc with published status:', - updateError + } + } catch (updateError) { + console.error( + 'Failed to update release doc with published status:', + updateError + ) + recordPendingReleaseErr(doc, updateError, 'failed_after_upload') + } + } +} + +const takedownReleases = async (audiusSdk: AudiusSdkType) => { + let documents + try { + documents = await Releases.find({ release_status: 'awaiting_delete' }) + } catch (error) { + console.error('Failed to fetch pending takedowns:', error) + return + } + + for (const doc of documents) { + const releaseId = doc._id + + if (doc.release_status == 'failed_after_delete') { + console.error( + `releases doc with ID ${releaseId} requires manual intervention. Release was successfully removed from Audius but failed to update in Mongo.` + ) + continue + } else if (doc.failure_count > 3) { + console.error( + `Tried to takedown release with ID ${releaseId} 3 times. Requires manual intervention` + ) + continue + } + + let deleteResult: { + blockHash: string + blockNumber: number + } + try { + if (!doc.entity_id) { + throw new Error('Missing entity id in pending takedown release') + } + + if (doc?.sdk_upload_metadata?.title) { + deleteResult = await deleteTrack( + audiusSdk, + doc.entity_id, + doc.sdk_upload_metadata + ) + } else if (doc?.sdk_upload_metadata?.playlist_name) { + deleteResult = await deleteAlbum( + audiusSdk, + doc.entity_id, + doc.sdk_upload_metadata ) - recordPendingReleaseErr(doc, updateError, 'failed_after_upload') + } else { + throw new Error('Missing track or album in pending takedown release') } + } catch (error) { + recordPendingReleaseErr(doc, error, 'failed_during_delete') + continue + } + + console.log('Took down release:', releaseId) + + // Mark release as deleted in Mongo + try { + await Releases.updateOne( + { _id: doc._id }, + { + $set: { + blockhash: deleteResult.blockHash, + blocknumber: deleteResult.blockNumber, + release_status: 'deleted', + }, + } + ) + } catch (updateError) { + console.error( + 'Failed to update release doc with deleted status:', + updateError + ) + recordPendingReleaseErr(doc, updateError, 'failed_after_delete') } + } +} - // Wait 10 seconds before checking for new pending releases +export const processReleases = async ( + audiusSdk: AudiusSdkType, + s3: ReturnType +) => { + // eslint-disable-next-line no-constant-condition + while (true) { + await publishReleases(audiusSdk, s3) + await takedownReleases(audiusSdk) + // Wait 10 seconds before checking for new pending releases/takedowns await new Promise((resolve) => setTimeout(resolve, 10_000)) } } From 8caeaf43530efbb388d59101a4e54e3c71d717d4 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Fri, 26 Apr 2024 14:39:41 -0700 Subject: [PATCH 2/5] fix some typos and remove all unnecessary types from the publisher. leave validation to the parser + the sdk --- packages/ddex/publisher/src/index.ts | 4 +- .../ddex/publisher/src/models/releases.ts | 234 +----------------- .../src/services/publisherService.ts | 60 +++-- 3 files changed, 31 insertions(+), 267 deletions(-) diff --git a/packages/ddex/publisher/src/index.ts b/packages/ddex/publisher/src/index.ts index 738e21c75f1..c95c130d2d1 100644 --- a/packages/ddex/publisher/src/index.ts +++ b/packages/ddex/publisher/src/index.ts @@ -18,7 +18,7 @@ switch (process.env.NETWORK) { } import { dialDb } from './services/dbService' -import { publishReleases } from './services/publisherService' +import { processReleases } from './services/publisherService' import createS3 from './services/s3Service' ;(async () => { try { @@ -29,7 +29,7 @@ import createS3 from './services/s3Service' const sdkService = await createSdkService() const s3 = createS3() - publishReleases(sdkService.getSdk(), s3) + processReleases(sdkService.getSdk(), s3) } catch (error) { console.error('Failed to initialize:', error) process.exit(1) diff --git a/packages/ddex/publisher/src/models/releases.ts b/packages/ddex/publisher/src/models/releases.ts index 47977b06b76..083645be4b4 100644 --- a/packages/ddex/publisher/src/models/releases.ts +++ b/packages/ddex/publisher/src/models/releases.ts @@ -1,85 +1,5 @@ import mongoose from 'mongoose' -const genres = [ - 'All Genres', - 'Electronic', - 'Rock', - 'Metal', - 'Alternative', - 'Hip-Hop/Rap', - 'Experimental', - 'Punk', - 'Folk', - 'Pop', - 'Ambient', - 'Soundtrack', - 'World', - 'Jazz', - 'Acoustic', - 'Funk', - 'R&B/Soul', - 'Devotional', - 'Classical', - 'Reggae', - 'Podcasts', - 'Country', - 'Spoken Word', - 'Comedy', - 'Blues', - 'Kids', - 'Audiobooks', - 'Latin', - 'Lo-Fi', - 'Hyperpop', - 'Techno', - 'Trap', - 'House', - 'Tech House', - 'Deep House', - 'Disco', - 'Electro', - 'Jungle', - 'Progressive House', - 'Hardstyle', - 'Glitch Hop', - 'Trance', - 'Future Bass', - 'Future House', - 'Tropical House', - 'Downtempo', - 'Drum & Bass', - 'Dubstep', - 'Jersey Club', - 'Vaporwave', - 'Moombahton', - 'Dancehall', -] -const moods = [ - 'Peaceful', - 'Romantic', - 'Sentimental', - 'Tender', - 'Easygoing', - 'Yearning', - 'Sophisticated', - 'Sensual', - 'Cool', - 'Gritty', - 'Melancholy', - 'Serious', - 'Brooding', - 'Fiery', - 'Defiant', - 'Aggressive', - 'Rowdy', - 'Excited', - 'Energizing', - 'Empowering', - 'Stirring', - 'Upbeat', - 'Other', -] -// TODO michelle simplify this between the go code and here export const releaseStatuses = [ 'awaiting_parse', 'awaiting_publish', @@ -95,158 +15,6 @@ export const releaseStatuses = [ 'failed_after_delete', ] as const -interface ResourceContributor { - name: string - roles: [string] - sequence_number: number -} -const resourceContributorSchema = new mongoose.Schema({ - name: String, - roles: [String], - sequence_number: Number, -}) - -interface RightsController { - name: string - roles: [string] - rights_share_unknown?: string -} - -const rightsControllerSchema = new mongoose.Schema({ - name: String, - roles: [String], - rights_share_unknown: String, -}) - -interface Copyright { - year: string - text: string -} - -const copyrightSchema = new mongoose.Schema({ - year: String, - text: String, -}) - -const trackMetadataSchema = new mongoose.Schema({ - title: { type: String, required: true }, - release_date: { type: Date, required: true }, - ddex_release_ids: mongoose.Schema.Types.Mixed, - genre: { type: String, enum: genres }, - duration: Number, - preview_start_seconds: Number, - isrc: String, - license: String, - description: String, - mood: { type: String, enum: moods }, - tags: String, - preview_audio_file_url: String, - preview_audio_file_url_hash: String, - preview_audio_file_url_hash_algo: String, - audio_file_url: String, - audio_file_url_hash: String, - audio_file_url_hash_algo: String, - - // Required if it's a standalone track. Uses playlist_owner_id and playlist's cover_art_url if it's part of an album - artist_id: String, - artist_name: String, - artists: { type: [resourceContributorSchema], default: null }, - resource_contributors: { type: [resourceContributorSchema], default: null }, - indirect_resource_contributors: { - type: [resourceContributorSchema], - default: null, - }, - rights_controller: { type: rightsControllerSchema, default: null }, - copyright_line: { type: copyrightSchema, default: null }, - producer_copyright_line: { type: copyrightSchema, default: null }, - parental_warning_type: { type: String, default: null }, - cover_art_url: String, - cover_art_url_hash: String, - cover_art_url_hash_algo: String, - is_stream_gated: { type: Boolean, default: false }, - stream_conditions: { type: mongoose.Schema.Types.Mixed, default: null }, - is_download_gated: { type: Boolean, default: false }, - download_conditions: { type: mongoose.Schema.Types.Mixed, default: null }, - is_stream_follow_gated: { type: Boolean, default: false }, - is_stream_tip_gated: { type: Boolean, default: false }, - is_download_follow_gated: { type: Boolean, default: false }, -}) - -export type TrackMetadata = mongoose.InferSchemaType - -const nullableString = { - type: String, - default: null, -} - -const nullableBool = { - type: Boolean, - default: null, -} - -const nullableInt = { - type: Number, - default: null, -} - -export const sdkUploadMetadataSchema = new mongoose.Schema( - { - // Used by both tracks and albums - release_date: { type: Date, required: true }, - genre: { type: String, required: true, enum: genres }, - artists: { type: [resourceContributorSchema], default: null }, - description: nullableString, - ddex_release_ids: mongoose.Schema.Types.Mixed, - mood: { type: String, enum: moods, default: null }, - tags: nullableString, - copyright_line: copyrightSchema, - producer_copyright_line: copyrightSchema, - parental_warning_type: nullableString, - license: nullableString, - cover_art_url: { type: String, required: true }, - cover_art_url_hash: nullableString, - cover_art_url_hash_algo: nullableString, - has_deal: { type: Boolean, default: false }, - - // Only for tracks - title: nullableString, - artist_id: nullableString, - duration: Number, - preview_start_seconds: nullableInt, - isrc: nullableString, - resource_contributors: [resourceContributorSchema], - indirect_resource_contributors: [resourceContributorSchema], - rights_controller: rightsControllerSchema, - preview_audio_file_url: nullableString, - preview_audio_file_url_hash: nullableString, - preview_audio_file_url_hash_algo: nullableString, - audio_file_url: nullableString, - audio_file_url_hash: nullableString, - audio_file_url_hash_algo: nullableString, - is_stream_gated: { type: Boolean, default: false }, - stream_conditions: { type: mongoose.Schema.Types.Mixed, default: null }, - is_download_gated: { type: Boolean, default: false }, - download_conditions: { type: mongoose.Schema.Types.Mixed, default: null }, - is_stream_follow_gated: { type: Boolean, default: false }, - is_stream_tip_gated: { type: Boolean, default: false }, - is_download_follow_gated: { type: Boolean, default: false }, - - // Only for albums - tracks: [trackMetadataSchema], - playlist_name: nullableString, - playlist_owner_id: nullableString, - playlist_owner_name: nullableString, - is_album: nullableBool, - is_private: nullableBool, - upc: nullableString, - }, - { _id: false } -) // _id is set to false because this schema is used as a sub-document - -export type SDKUploadMetadataSchema = mongoose.InferSchemaType< - typeof sdkUploadMetadataSchema -> - const releasesSchema = new mongoose.Schema({ _id: { type: String, required: true }, delivery_remote_path: { type: String, required: true }, @@ -260,7 +28,7 @@ const releasesSchema = new mongoose.Schema({ ], }, parsed_release_elems: mongoose.Schema.Types.Mixed, - sdk_upload_metadata: sdkUploadMetadataSchema, + sdk_upload_metadata: mongoose.Schema.Types.Mixed, last_parsed: { type: Date, required: true }, parse_errors: [String], publish_errors: [String], diff --git a/packages/ddex/publisher/src/services/publisherService.ts b/packages/ddex/publisher/src/services/publisherService.ts index 8ffd38b63dc..d13f015ee46 100644 --- a/packages/ddex/publisher/src/services/publisherService.ts +++ b/packages/ddex/publisher/src/services/publisherService.ts @@ -1,9 +1,5 @@ import Releases, { releaseStatuses } from '../models/releases' -import type { - TrackMetadata, - SDKUploadMetadataSchema, - Release, -} from '../models/releases' +import type { Release } from '../models/releases' import type { AudiusSdk as AudiusSdkType, UploadTrackRequest, @@ -17,9 +13,7 @@ import type { } from '@audius/sdk' import createS3 from './s3Service' -const formatTrackMetadata = ( - metadata: SDKUploadMetadataSchema | TrackMetadata -): UploadTrackRequest['metadata'] => { +const formatTrackMetadata = (metadata: any): UploadTrackRequest['metadata'] => { return { title: metadata.title, description: metadata.description || '', @@ -55,9 +49,7 @@ const formatTrackMetadata = ( } } -const formatAlbumMetadata = ( - metadata: SDKUploadMetadataSchema -): UploadAlbumRequest['metadata'] => { +const formatAlbumMetadata = (metadata: any): UploadAlbumRequest['metadata'] => { return { genre: metadata.genre as Genre, albumName: metadata.playlist_name, @@ -77,7 +69,7 @@ const formatAlbumMetadata = ( const uploadTrack = async ( audiusSdk: AudiusSdkType, - pendingTrack: SDKUploadMetadataSchema, + pendingTrack: any, s3Service: ReturnType ) => { const userId = pendingTrack.artist_id @@ -117,7 +109,7 @@ const uploadTrack = async ( const updateTrack = async ( audiusSdk: AudiusSdkType, trackId: string, - pendingTrack: SDKUploadMetadataSchema, + pendingTrack: any, s3Service: ReturnType ) => { const userId = pendingTrack.artist_id @@ -150,7 +142,7 @@ const updateTrack = async ( const deleteTrack = async ( audiusSdk: AudiusSdkType, trackId: string, - trackMetadata: SDKUploadMetadataSchema + trackMetadata: any ) => { const deleteTrackRequest: DeleteTrackRequest = { trackId, @@ -166,7 +158,7 @@ const deleteTrack = async ( const uploadAlbum = async ( audiusSdk: AudiusSdkType, - pendingAlbum: SDKUploadMetadataSchema, + pendingAlbum: any, s3Service: ReturnType ) => { // Fetch cover art from S3 @@ -180,7 +172,7 @@ const uploadAlbum = async ( } // Fetch track audio files from S3 - const trackFilesPromises = pendingAlbum.tracks.map(async (track) => { + const trackFilesPromises = pendingAlbum.tracks.map(async (track: any) => { const trackDownload = await s3Service.downloadFromS3Crawled( track.audio_file_url! ) @@ -192,8 +184,8 @@ const uploadAlbum = async ( }) const trackFiles = await Promise.all(trackFilesPromises) - const trackMetadatas = pendingAlbum.tracks.map( - (trackMetadata: TrackMetadata) => formatTrackMetadata(trackMetadata) + const trackMetadatas = pendingAlbum.tracks.map((trackMetadata: any) => + formatTrackMetadata(trackMetadata) ) const uploadAlbumRequest: UploadAlbumRequest = { @@ -215,7 +207,7 @@ const uploadAlbum = async ( const updateAlbum = async ( audiusSdk: AudiusSdkType, albumId: string, - pendingAlbum: SDKUploadMetadataSchema, + pendingAlbum: any, s3Service: ReturnType ) => { // Fetch cover art from S3 @@ -239,7 +231,7 @@ const updateAlbum = async ( console.log( `Updating ${pendingAlbum.playlist_name} by ${pendingAlbum.playlist_owner_id} (album id ${albumId})...` ) - const result = await audiusSdk.albums.uploadAlbum(updateAlbumRequest) + const result = await audiusSdk.albums.updateAlbum(updateAlbumRequest) console.log(result) return result } @@ -247,7 +239,7 @@ const updateAlbum = async ( const deleteAlbum = async ( audiusSdk: AudiusSdkType, albumId: string, - albumMetadata: SDKUploadMetadataSchema + albumMetadata: any ) => { const deleteAlbumRequest: DeleteAlbumRequest = { albumId, @@ -339,16 +331,20 @@ const publishReleases = async ( continue } - let uploadResult: { - trackId?: string | null - albumId?: string | null - blockHash: string - blockNumber: number - } - let updateResult: { - blockHash: string - blockNumber: number - } + let uploadResult: + | { + trackId?: string | null + albumId?: string | null + blockHash: string + blockNumber: number + } + | undefined + let updateResult: + | { + blockHash: string + blockNumber: number + } + | undefined try { if (doc.is_update && !doc.entity_id) { throw new Error('Missing entity id in pending update release') @@ -406,7 +402,7 @@ const publishReleases = async ( }, } ) - } else { + } else if (uploadResult) { await Releases.updateOne( { _id: doc._id }, { From 593603eec2d5339bef7666a744803c4c902d01d4 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Fri, 26 Apr 2024 14:45:05 -0700 Subject: [PATCH 3/5] cleanup --- packages/ddex/ingester/parser/parser.go | 4 ++-- packages/ddex/publisher/src/services/publisherService.ts | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/ddex/ingester/parser/parser.go b/packages/ddex/ingester/parser/parser.go index 593afb0fde5..af370b9150d 100644 --- a/packages/ddex/ingester/parser/parser.go +++ b/packages/ddex/ingester/parser/parser.go @@ -97,7 +97,7 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { // Use local-name() to ignore namespace because sometimes it's "ern" and sometimes it's "ernm" msgVersionElem := xmlquery.FindOne(doc, "//*[local-name()='NewReleaseMessage']") if msgVersionElem == nil { - logParsingErr(fmt.Errorf("missing or element")) + logParsingErr(fmt.Errorf("missing element")) return } @@ -146,7 +146,7 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { } if release.ReleaseStatus == constants.ReleaseStatusDeleted || release.ReleaseStatus == constants.ReleaseStatusAwaitingDelete { - p.Logger.Info("Parsed takedown request", "id", release.ReleaseID) + p.Logger.Info("Parsed takedown release", "id", release.ReleaseID) return } else { p.Logger.Info("Parsed release", "id", release.ReleaseID) diff --git a/packages/ddex/publisher/src/services/publisherService.ts b/packages/ddex/publisher/src/services/publisherService.ts index d13f015ee46..5a5cc2bb554 100644 --- a/packages/ddex/publisher/src/services/publisherService.ts +++ b/packages/ddex/publisher/src/services/publisherService.ts @@ -416,6 +416,8 @@ const publishReleases = async ( }, } ) + } else { + throw new Error('No update or upload result returned') } } catch (updateError) { console.error( From 396ed2d5b3e7f5bcb44bad76208b957375b2baeb Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Mon, 29 Apr 2024 17:24:39 -0700 Subject: [PATCH 4/5] parse PurgeReleaseMessage and parse all known release IDs when included in XML. NOTE: not sure what the filename looks like for PurgeReleaseMessages so I'm not sure if the crawler will correctly handle these types of messages. Need real examples to test with --- packages/ddex/ingester/crawler/crawler.go | 2 +- packages/ddex/ingester/parser/ern38x.go | 112 ++++++---- packages/ddex/ingester/parser/parser.go | 250 +++++++++++++--------- 3 files changed, 225 insertions(+), 139 deletions(-) diff --git a/packages/ddex/ingester/crawler/crawler.go b/packages/ddex/ingester/crawler/crawler.go index 0aaf24879f7..de2ff1cc68d 100644 --- a/packages/ddex/ingester/crawler/crawler.go +++ b/packages/ddex/ingester/crawler/crawler.go @@ -230,7 +230,7 @@ func (c *Crawler) upsertXML(key string, lastModified time.Time) (err error) { ReleaseID: strings.TrimSuffix(filepath.Base(key), ".xml"), XMLRemotePath: remotePath, RawXML: primitive.Binary{Data: xmlBytes, Subtype: 0x00}, - CreatedAt: lastModified, + LastParsed: lastModified, ParseErrors: []string{}, PublishErrors: []string{}, FailureCount: 0, diff --git a/packages/ddex/ingester/parser/ern38x.go b/packages/ddex/ingester/parser/ern38x.go index e63eed4cbad..2a82de7eeb4 100644 --- a/packages/ddex/ingester/parser/ern38x.go +++ b/packages/ddex/ingester/parser/ern38x.go @@ -5,6 +5,7 @@ import ( "fmt" "ingester/common" "ingester/constants" + "reflect" "regexp" "slices" "sort" @@ -107,6 +108,42 @@ type ResourceGroupContentItem struct { Image *Image } +// purgeERN38x parses the given XML data and marks a release to be taken down from Audius. +// NOTE: This expects the ERN 3 format. See https://kb.ddex.net/implementing-each-standard/electronic-release-notification-message-suite-(ern)/ern-3-explained/ +func purgeERN38x(doc *xmlquery.Node, release *common.Release, releasesColl *mongo.Collection) error { + // Parse s from + releaseNodes := xmlquery.Find(doc, "//ReleaseList/Release") + if len(releaseNodes) == 0 { + return fmt.Errorf("no found") + } + for _, rNode := range releaseNodes { + releaseIDNode := rNode.SelectElement("ReleaseId") + if releaseIDNode == nil { + return fmt.Errorf("no found") + } + releaseIDs := getReleaseIDs(releaseIDNode) + releaseIDsVal := reflect.ValueOf(releaseIDs) + for i := 0; i < releaseIDsVal.NumField(); i++ { + releaseID := releaseIDsVal.Field(i).String() + if releaseID == "" { + continue + } + + // Take down the release with this ID, if any + existingRelease, err := getExistingRelease(releaseID, releasesColl) + if err == mongo.ErrNoDocuments { + continue + } else if err != nil { + return err + } else { + takedownRelease(existingRelease, release) + break + } + } + } + return nil +} + // parseERN38x parses the given XML data and returns a release ready to be uploaded to Audius. // NOTE: This expects the ERN 3 format. See https://kb.ddex.net/implementing-each-standard/electronic-release-notification-message-suite-(ern)/ern-3-explained/ func parseERN38x(doc *xmlquery.Node, crawledBucket string, release *common.Release, releasesColl *mongo.Collection) (errs []error) { @@ -178,28 +215,17 @@ func parseERN38x(doc *xmlquery.Node, crawledBucket string, release *common.Relea dealNodes := xmlquery.Find(doc, "//DealList/ReleaseDeal") if len(dealNodes) == 0 { // Check for an existing release to determine whether this is a takedown request or an invalid NewReleaseMessage - var existingRelease common.Release - filter := bson.M{"_id": release.ReleaseID} - err = releasesColl.FindOne(context.Background(), filter).Decode(&existingRelease) - if err == nil { - // This is a takedown request. Mark the existing release for deletion - switch existingRelease.ReleaseStatus { - case constants.ReleaseStatusPublished, constants.ReleaseStatusFailedAfterUpload, constants.ReleaseStatusFailedDuringDelete: - // Has been published to Audius. Mark for deletion by the publisher - release.ReleaseStatus = constants.ReleaseStatusAwaitingDelete - release.EntityID = existingRelease.EntityID - release.SDKUploadMetadata = existingRelease.SDKUploadMetadata - default: - // Has not yet been published to Audius. Mark as deleted - release.ReleaseStatus = constants.ReleaseStatusDeleted - } - } else if err != nil && err != mongo.ErrNoDocuments { - errs = append(errs, err) - return - } else { + existingRelease, err := getExistingRelease(release.ReleaseID, releasesColl) + if err == mongo.ErrNoDocuments { // This is a NewReleaseMessage that should have a deal errs = append(errs, fmt.Errorf("no found")) return + } else if err != nil { + errs = append(errs, err) + return + } else { + // This is a takedown request. Mark the release for deletion + takedownRelease(existingRelease, release) } } for _, dNode := range dealNodes { @@ -639,28 +665,12 @@ func processReleaseNode(rNode *xmlquery.Node, soundRecordings *[]SoundRecording, } r = &common.ParsedReleaseElement{ - IsMainRelease: rNode.SelectAttr("IsMainRelease") == "true", - ReleaseRef: releaseRef, - ReleaseDate: releaseDate, - Resources: resources, - ReleaseType: releaseType, - ReleaseIDs: common.ReleaseIDs{ - PartyID: safeInnerText(rNode.SelectElement("ReleaseId/PartyId")), - CatalogNumber: safeInnerText(rNode.SelectElement("ReleaseId/CatalogNumber")), - ICPN: safeInnerText(rNode.SelectElement("ReleaseId/ICPN")), - GRid: safeInnerText(rNode.SelectElement("ReleaseId/GRid")), - ISAN: safeInnerText(rNode.SelectElement("ReleaseId/ISAN")), - ISBN: safeInnerText(rNode.SelectElement("ReleaseId/ISBN")), - ISMN: safeInnerText(rNode.SelectElement("ReleaseId/ISMN")), - ISRC: isrc, - ISSN: safeInnerText(rNode.SelectElement("ReleaseId/ISSN")), - ISTC: safeInnerText(rNode.SelectElement("ReleaseId/ISTC")), - ISWC: safeInnerText(rNode.SelectElement("ReleaseId/ISWC")), - MWLI: safeInnerText(rNode.SelectElement("ReleaseId/MWLI")), - SICI: safeInnerText(rNode.SelectElement("ReleaseId/SICI")), - ProprietaryID: safeInnerText(rNode.SelectElement("ReleaseId/ProprietaryId")), - }, - + IsMainRelease: rNode.SelectAttr("IsMainRelease") == "true", + ReleaseRef: releaseRef, + ReleaseDate: releaseDate, + Resources: resources, + ReleaseType: releaseType, + ReleaseIDs: getReleaseIDs(rNode.SelectElement("ReleaseId")), DisplayTitle: safeInnerText(releaseDetails.SelectElement("Title[@TitleType='DisplayTitle']/TitleText")), // TODO: This assumes there aren't multiple titles in different languages (ie, different `LanguageAndScriptCode` attributes) DisplaySubtitle: stringPtr(safeInnerText(releaseDetails.SelectElement("Title[@TitleType='DisplayTitle']/SubTitle"))), FormalTitle: stringPtr(safeInnerText(releaseDetails.SelectElement("Title[@TitleType='FormalTitle']/TitleText"))), @@ -1266,6 +1276,26 @@ func processResourceGroup(node *xmlquery.Node, parentSequence int, contentItems } } +func getExistingRelease(releaseID string, releasesColl *mongo.Collection) (common.Release, error) { + var existingRelease common.Release + filter := bson.M{"_id": releaseID} + err := releasesColl.FindOne(context.Background(), filter).Decode(&existingRelease) + return existingRelease, err +} + +func takedownRelease(existingRelease common.Release, releaseToUpsert *common.Release) { + switch existingRelease.ReleaseStatus { + case constants.ReleaseStatusPublished, constants.ReleaseStatusFailedAfterUpload, constants.ReleaseStatusFailedDuringDelete: + // Has been published to Audius. Mark for deletion by the publisher + releaseToUpsert.ReleaseStatus = constants.ReleaseStatusAwaitingDelete + releaseToUpsert.EntityID = existingRelease.EntityID + releaseToUpsert.SDKUploadMetadata = existingRelease.SDKUploadMetadata + default: + // Has not yet been published to Audius. Mark as deleted + releaseToUpsert.ReleaseStatus = constants.ReleaseStatusDeleted + } +} + func safeInnerText(node *xmlquery.Node) string { if node != nil { return node.InnerText() diff --git a/packages/ddex/ingester/parser/parser.go b/packages/ddex/ingester/parser/parser.go index af370b9150d..a2d18a5f877 100644 --- a/packages/ddex/ingester/parser/parser.go +++ b/packages/ddex/ingester/parser/parser.go @@ -6,12 +6,12 @@ import ( "ingester/artistutils" "ingester/common" "ingester/constants" + "reflect" "strconv" "strings" "time" "github.com/antchfx/xmlquery" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -38,11 +38,8 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { // Upsert the release after parsing regardless of success or failure defer func() { - filter := bson.M{"_id": release.ReleaseID} - var existingRelease common.Release - - // Find the existing document, if any - err := p.ReleasesColl.FindOne(p.Ctx, filter).Decode(&existingRelease) + // Find the existing release, if any + existingRelease, err := getExistingRelease(release.ReleaseID, p.ReleasesColl) if err == nil { if existingRelease.ReleaseStatus == constants.ReleaseStatusPublished || existingRelease.ReleaseStatus == constants.ReleaseStatusFailedAfterUpload { // Set is_update to true if the existing release has already been published to Audius @@ -95,12 +92,16 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { } // Use local-name() to ignore namespace because sometimes it's "ern" and sometimes it's "ernm" - msgVersionElem := xmlquery.FindOne(doc, "//*[local-name()='NewReleaseMessage']") + msgType := "NewReleaseMessage" + msgVersionElem := xmlquery.FindOne(doc, fmt.Sprintf("//*[local-name()='%s']", msgType)) + if msgVersionElem == nil { + msgType := "PurgeReleaseMessage" + msgVersionElem = xmlquery.FindOne(doc, fmt.Sprintf("//*[local-name()='%s']", msgType)) + } if msgVersionElem == nil { - logParsingErr(fmt.Errorf("missing element")) + logParsingErr(fmt.Errorf("missing or element")) return } - // Extract the ERN Version in the form of 'ern/xxx' or '/ern/xxx' msgSchemaVersionId := msgVersionElem.SelectAttr("MessageSchemaVersionId") ernVersion := strings.TrimPrefix(msgSchemaVersionId, "/") @@ -112,10 +113,71 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { return } + var errs []error + if msgType == "NewReleaseMessage" { + releaseProfileVersionID := msgVersionElem.SelectAttr("ReleaseProfileVersionId") + errs = p.parseNewMessageRelease(doc, release, ernVersion, releaseProfileVersionID, msgSchemaVersionId) + } else if msgType == "PurgeReleaseMessage" { + err = p.parsePurgeReleaseMessage(doc, release, ernVersion, msgSchemaVersionId) + errs = append(errs, err) + } + + if len(errs) != 0 { + for _, err := range errs { + logParsingErr(err) + } + } + return +} + +// parseBatch (idempotent) (re-)populates the batch's fields from the raw XML data +func (p *Parser) parseBatch(batch *common.Batch) (*xmlquery.Node, error) { + // Reset fields that we're about to parse in case this is a retry + batch.DDEXSchema = "" + batch.NumMessages = 0 + + // Upsert the batch after parsing regardless of success or failure + defer func() { + if _, err := p.UpsertBatch(batch); err != nil { + p.Logger.Error("failed to upsert batch", "err", err) + } + }() + + xmlData := batch.BatchXML.Data + doc, err := xmlquery.Parse(bytes.NewReader(xmlData)) + if err != nil { + return nil, fmt.Errorf("failed to read XML bytes: %v", err) + } + + // Parse the batch's DDEX schema version. Some test deliveries use xmlns:ernm, while others uses xmlns:ern-c + ernmAttr := xmlquery.FindOne(doc, "//@xmlns:ernm") + erncAttr := xmlquery.FindOne(doc, "//@xmlns:ern-c") + if ernmAttr != nil && strings.HasPrefix(ernmAttr.InnerText(), "http://ddex.net/xml/ern/") { + batch.DDEXSchema = strings.Split(ernmAttr.InnerText(), "http://ddex.net/xml/ern/")[1] + } else if erncAttr != nil && erncAttr.InnerText() == "http://ddex.net/xml/ern-c/15" { + batch.DDEXSchema = "ern/382" + } else { + return nil, fmt.Errorf("missing or unexpected xmlns:ernm and xmlns:ern-c") + } + + // Parse NumberOfMessages + numMessagesNode := xmlquery.FindOne(doc, "//NumberOfMessages") + if numMessagesNode == nil { + return nil, fmt.Errorf("NumberOfMessages element not found") + } + numMessages, err := strconv.Atoi(numMessagesNode.InnerText()) + if err != nil { + return nil, fmt.Errorf("failed to parse NumberOfMessages value: %v", err) + } + batch.NumMessages = numMessages + + return doc, nil +} + +func (p *Parser) parseNewMessageRelease(doc *xmlquery.Node, release *common.Release, ernVersion string, releaseProfileVersionID string, msgSchemaVersionId string) (errs []error) { // Extract the release profile. See https://kb.ddex.net/implementing-each-standard/electronic-release-notification-message-suite-(ern)/ern-3-explained/ern-3-profiles/release-profiles-in-ern-3/ - releaseProfileVersionIDStr := msgVersionElem.SelectAttr("ReleaseProfileVersionId") var releaseProfile common.ReleaseProfile - switch releaseProfileVersionIDStr { + switch releaseProfileVersionID { case string(common.Common13AudioSingle): releaseProfile = common.Common13AudioSingle case string(common.Common14AudioAlbumMusicOnly): @@ -126,26 +188,25 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { release.ReleaseProfile = releaseProfile release.ParsedReleaseElems = []common.ParsedReleaseElement{} - var errs []error + var parseErrs []error switch ernVersion { // Not sure what the difference is between 3.81 and 3.82 because DDEX only provides the most recent version and 1 version behind unless you contact them case "381": - errs = parseERN38x(doc, p.Bucket, release, p.ReleasesColl) + parseErrs = parseERN38x(doc, p.Bucket, release, p.ReleasesColl) case "382": - errs = parseERN38x(doc, p.Bucket, release, p.ReleasesColl) + parseErrs = parseERN38x(doc, p.Bucket, release, p.ReleasesColl) default: - logParsingErr(fmt.Errorf("unsupported schema: '%s'. Expected ern/381 or ern/382", msgSchemaVersionId)) + errs = append(errs, fmt.Errorf("unsupported schema: '%s'. Expected ern/381 or ern/382", msgSchemaVersionId)) return } + errs = append(errs, parseErrs...) if len(errs) != 0 { - for _, err := range errs { - logParsingErr(err) - } return } if release.ReleaseStatus == constants.ReleaseStatusDeleted || release.ReleaseStatus == constants.ReleaseStatusAwaitingDelete { + // No further parsing needed at this point for takedowns via NewReleaseMessages p.Logger.Info("Parsed takedown release", "id", release.ReleaseID) return } else { @@ -159,8 +220,8 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { p.Logger.Info("Warnings while finding an artist ID for release", "display title", parsedRelease.DisplayTitle, "display artists", parsedRelease.Artists, "warnings", fmt.Sprintf("%+v", warnings)) } if err != nil { - logParsingErr(fmt.Errorf("failed to find an artist ID from display artists %+v: %v", parsedRelease.Artists, err)) release.ReleaseStatus = constants.ReleaseStatusErrorUserMatch + errs = append(errs, fmt.Errorf("failed to find an artist ID from display artists %+v: %v", parsedRelease.Artists, err)) return } p.Logger.Info("Found artist ID for release", "artistID", artistID, "artistName", artistName, "display title", parsedRelease.DisplayTitle, "display artists", parsedRelease.Artists) @@ -184,7 +245,7 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { // Verify release element has a corresponding deal (only required for tracks for now) if parsedRelease.ReleaseType == common.TrackReleaseType { if !parsedRelease.HasDeal { - logParsingErr(fmt.Errorf("missing deal for release ref '%s' does not have a corresponding deal", parsedRelease.ReleaseRef)) + errs = append(errs, fmt.Errorf("missing deal for release ref '%s' does not have a corresponding deal", parsedRelease.ReleaseRef)) return } } @@ -205,7 +266,7 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { // Verify release has a nonzero ValidityStartDate if parsedRelease.ValidityStartDate.IsZero() { - logParsingErr(fmt.Errorf("invalid validity start date for release ref '%s'", parsedRelease.ReleaseRef)) + errs = append(errs, fmt.Errorf("invalid validity start date for release ref '%s'", parsedRelease.ReleaseRef)) return } @@ -218,10 +279,8 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { } sdkErrs := buildSDKMetadataERN38x(release) - if len(sdkErrs) != 0 { - for _, err := range sdkErrs { - logParsingErr(err) - } + errs = append(errs, sdkErrs...) + if len(errs) != 0 { return } @@ -229,48 +288,21 @@ func (p *Parser) ParseRelease(release *common.Release) (ok bool) { return } -// parseBatch (idempotent) (re-)populates the batch's fields from the raw XML data -func (p *Parser) parseBatch(batch *common.Batch) (*xmlquery.Node, error) { - // Reset fields that we're about to parse in case this is a retry - batch.DDEXSchema = "" - batch.NumMessages = 0 - - // Upsert the batch after parsing regardless of success or failure - defer func() { - if _, err := p.UpsertBatch(batch); err != nil { - p.Logger.Error("failed to upsert batch", "err", err) - } - }() - - xmlData := batch.BatchXML.Data - doc, err := xmlquery.Parse(bytes.NewReader(xmlData)) - if err != nil { - return nil, fmt.Errorf("failed to read XML bytes: %v", err) - } - - // Parse the batch's DDEX schema version. Some test deliveries use xmlns:ernm, while others uses xmlns:ern-c - ernmAttr := xmlquery.FindOne(doc, "//@xmlns:ernm") - erncAttr := xmlquery.FindOne(doc, "//@xmlns:ern-c") - if ernmAttr != nil && strings.HasPrefix(ernmAttr.InnerText(), "http://ddex.net/xml/ern/") { - batch.DDEXSchema = strings.Split(ernmAttr.InnerText(), "http://ddex.net/xml/ern/")[1] - } else if erncAttr != nil && erncAttr.InnerText() == "http://ddex.net/xml/ern-c/15" { - batch.DDEXSchema = "ern/382" - } else { - return nil, fmt.Errorf("missing or unexpected xmlns:ernm and xmlns:ern-c") +func (p *Parser) parsePurgeReleaseMessage(doc *xmlquery.Node, release *common.Release, ernVersion string, msgSchemaVersionId string) (err error) { + switch ernVersion { + case "381": + err = purgeERN38x(doc, release, p.ReleasesColl) + case "382": + err = purgeERN38x(doc, release, p.ReleasesColl) + default: + err = fmt.Errorf("unsupported schema: '%s'. Expected ern/381 or ern/382", msgSchemaVersionId) } - // Parse NumberOfMessages - numMessagesNode := xmlquery.FindOne(doc, "//NumberOfMessages") - if numMessagesNode == nil { - return nil, fmt.Errorf("NumberOfMessages element not found") - } - numMessages, err := strconv.Atoi(numMessagesNode.InnerText()) - if err != nil { - return nil, fmt.Errorf("failed to parse NumberOfMessages value: %v", err) + if err == nil { + p.Logger.Info("Parsed takedown release", "id", release.ReleaseID) } - batch.NumMessages = numMessages - return doc, nil + return } func (p *Parser) validateReleaseIsInBatch(release *common.Release, batch *common.Batch, doc *xmlquery.Node) error { @@ -281,52 +313,76 @@ func (p *Parser) validateReleaseIsInBatch(release *common.Release, batch *common continue } - // TODO: Support more ID types (GRid is preferred) as we get more examples - var releaseID string - releaseICPN := safeInnerText(messageInBatch.SelectElement("IncludedReleaseId/ICPN")) - releaseGRid := safeInnerText(messageInBatch.SelectElement("IncludedReleaseId/GRid")) - if releaseICPN != "" { - releaseID = releaseICPN - } else if releaseGRid != "" { - releaseID = releaseGRid - } else { + includedReleaseIdNode := messageInBatch.SelectElement("IncludedReleaseId") + if IncludedReleaseId == nil { p.Logger.Warn(fmt.Sprintf("no valid IncludedReleaseId found for message %d in batch '%s'", i, batch.BatchID)) continue } - // We found the release in the batch, so do some validation on it - if releaseID == release.ReleaseID { - // TODO: Handle updates and deletes - deliveryType := safeInnerText(messageInBatch.SelectElement("DeliveryType")) - if deliveryType != "NewReleaseDelivery" { - return fmt.Errorf("DeliveryType %s not supported", deliveryType) + releaseIDs := getReleaseIDs(includedReleaseIdNode) + releaseIDsVal := reflect.ValueOf(releaseIDs) + for i := 0; i < releaseIDsVal.NumField(); i++ { + releaseID := releaseIDsVal.Field(i).String() + if releaseID == "" { + continue } - productType := safeInnerText(messageInBatch.SelectElement("ProductType")) - if productType != "AudioProduct" { - return fmt.Errorf("ProductType %s not supported", productType) - } + // We found the release in the batch, so do some validation on it + if releaseID == release.ReleaseID { + // TODO: Handle updates and deletes + deliveryType := safeInnerText(messageInBatch.SelectElement("DeliveryType")) + if deliveryType != "NewReleaseDelivery" { + return fmt.Errorf("DeliveryType %s not supported", deliveryType) + } - // Validate the URL without the prefix "/" - releaseURL := strings.TrimPrefix(safeInnerText(messageInBatch.SelectElement("URL")), "/") + productType := safeInnerText(messageInBatch.SelectElement("ProductType")) + if productType != "AudioProduct" { + return fmt.Errorf("ProductType %s not supported", productType) + } - // Special case for Fuga deliveries with a different URL format - if strings.Contains(releaseURL, "ddex-prod-fuga-raw//") { - releaseURL = strings.SplitAfter(releaseURL, "ddex-prod-fuga-raw//")[1] - releaseURL = fmt.Sprintf("%s/%s", strings.Split(release.XMLRemotePath, "/")[3], releaseURL) - } - if !strings.HasPrefix(releaseURL, "s3://") { - releaseURL = fmt.Sprintf("s3://%s/%s", p.Bucket, releaseURL) - } + // Validate the URL without the prefix "/" + releaseURL := strings.TrimPrefix(safeInnerText(messageInBatch.SelectElement("URL")), "/") - if releaseURL != release.XMLRemotePath { - return fmt.Errorf("URL '%s' does not match expected value: '%s'", releaseURL, release.XMLRemotePath) - } + // Special case for Fuga deliveries with a different URL format + if strings.Contains(releaseURL, "ddex-prod-fuga-raw//") { + releaseURL = strings.SplitAfter(releaseURL, "ddex-prod-fuga-raw//")[1] + releaseURL = fmt.Sprintf("%s/%s", strings.Split(release.XMLRemotePath, "/")[3], releaseURL) + } + if !strings.HasPrefix(releaseURL, "s3://") { + releaseURL = fmt.Sprintf("s3://%s/%s", p.Bucket, releaseURL) + } - // Validation passed! - return nil + if releaseURL != release.XMLRemotePath { + return fmt.Errorf("URL '%s' does not match expected value: '%s'", releaseURL, release.XMLRemotePath) + } + + // Validation passed! + return nil + } } } return fmt.Errorf("no matching release found in batch") } + +func getReleaseIDs(node *xmlquery.Node) common.ReleaseIDs { + if node == nil { + return common.ReleaseIDs{} + } + return common.ReleaseIDs{ + PartyID: safeInnerText(node.SelectElement("PartyId")), + CatalogNumber: safeInnerText(node.SelectElement("CatalogNumber")), + ICPN: safeInnerText(node.SelectElement("ICPN")), + GRid: safeInnerText(node.SelectElement("GRid")), + ISAN: safeInnerText(node.SelectElement("ISAN")), + ISBN: safeInnerText(node.SelectElement("ISBN")), + ISMN: safeInnerText(node.SelectElement("ISMN")), + ISRC: safeInnerText(node.SelectElement("ISRC")), + ISSN: safeInnerText(node.SelectElement("ISSN")), + ISTC: safeInnerText(node.SelectElement("ISTC")), + ISWC: safeInnerText(node.SelectElement("ISWC")), + MWLI: safeInnerText(node.SelectElement("MWLI")), + SICI: safeInnerText(node.SelectElement("SICI")), + ProprietaryID: safeInnerText(node.SelectElement("ProprietaryId")), + } +} From 721d9a8107c21bccab490de2624730d98d31b242 Mon Sep 17 00:00:00 2001 From: Michelle Brier Date: Tue, 30 Apr 2024 16:48:42 -0700 Subject: [PATCH 5/5] fix tests --- packages/ddex/ingester/parser/ern38x.go | 1 - packages/ddex/ingester/parser/parser.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/ddex/ingester/parser/ern38x.go b/packages/ddex/ingester/parser/ern38x.go index 2a82de7eeb4..f6358136dd9 100644 --- a/packages/ddex/ingester/parser/ern38x.go +++ b/packages/ddex/ingester/parser/ern38x.go @@ -496,7 +496,6 @@ func processReleaseNode(rNode *xmlquery.Node, soundRecordings *[]SoundRecording, releaseRef := safeInnerText(rNode.SelectElement("ReleaseReference")) globalOriginalReleaseDateStr := safeInnerText(rNode.SelectElement("GlobalOriginalReleaseDate")) // Some suppliers (not Fuga) use this durationISOStr := safeInnerText(rNode.SelectElement("Duration")) // Only the Sony example uses this. Other suppliers use it in the SoundRecording - isrc := safeInnerText(rNode.SelectElement("ReleaseId/ISRC")) copyrightYear := safeInnerText(rNode.SelectElement("CLine/Year")) copyrightText := safeInnerText(rNode.SelectElement("CLine/CLineText")) producerCopyrightYear := safeInnerText(rNode.SelectElement("PLine/Year")) diff --git a/packages/ddex/ingester/parser/parser.go b/packages/ddex/ingester/parser/parser.go index a2d18a5f877..8cd60acf967 100644 --- a/packages/ddex/ingester/parser/parser.go +++ b/packages/ddex/ingester/parser/parser.go @@ -314,7 +314,7 @@ func (p *Parser) validateReleaseIsInBatch(release *common.Release, batch *common } includedReleaseIdNode := messageInBatch.SelectElement("IncludedReleaseId") - if IncludedReleaseId == nil { + if includedReleaseIdNode == nil { p.Logger.Warn(fmt.Sprintf("no valid IncludedReleaseId found for message %d in batch '%s'", i, batch.BatchID)) continue }