Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chatty-cameras-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@audius/sdk': patch
---

Upgrade chat blasts to real chats internally
6 changes: 5 additions & 1 deletion comms/discovery/rpcz/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ func websocketNotify(rpcJson json.RawMessage, userId int32, timestamp time.Time)
Metadata schema.Metadata `json:"metadata"`
}{
rpcJson,
schema.Metadata{Timestamp: timestamp.Format(time.RFC3339Nano), UserID: encodedUserId},
schema.Metadata{Timestamp: timestamp.Format(time.RFC3339Nano),
// Note this is the userId of the user sending the message
UserID: encodedUserId},
}

j, err := json.Marshal(data)
Expand All @@ -462,6 +464,8 @@ func websocketNotify(rpcJson json.RawMessage, userId int32, timestamp time.Time)
websocketPush(subscribedUserId, j)
}

} else if gjson.GetBytes(rpcJson, "method").String() == "chat.blast" {
websocketPushAll(rpcJson, timestamp)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be identical to the above except the userIds list comes from the current list of websockets not from the db? Why not use the same function in both places w/ a userIds param, or have websocketPushAll use websocketPush? There seems like there should be some reuse here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess websocketPush has the side effect of potentially sending the blast upgrade message multiple times within 10 seconds?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing i was trying to avoid was having this code reach into the websocket and grab the userId, felt like bad coupling. but i didn't think of websocketPushAll just calling websocketPush, that could work. we'd still have to form the metadata inside websocketPushAll but that's probably ok.

not sure what you're talking about with the 10 seconds thing?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in websocketPush caches the recent messages for up to 10 seconds and resends them on each new message it looks like

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see it. i think that's still fine tho? that only happens if the websocket gets disconnected/reconnected right? and if multiple blast rpcs come in for the same blast message, the worst that'll happen is it'll trigger multiple calls to getBlasts() but the data should all be fine since we upsert/don't insert dups etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm one downside to websocketPushAll calling websocketPush is that we'll iterate through the websocket list n^2 times... probably not the end of the world but not great.. dang should i favor perf or code re-use??

}
}

Expand Down
36 changes: 36 additions & 0 deletions comms/discovery/rpcz/websocket.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package rpcz

import (
"encoding/json"
"net"
"sync"
"time"

"comms.audius.co/discovery/misc"
"comms.audius.co/discovery/schema"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -96,3 +99,36 @@ func websocketPush(userId int32, payload []byte) {
}

}

func websocketPushAll(rpcJson json.RawMessage, timestamp time.Time) {
mu.Lock()
Comment thread
dharit-tan marked this conversation as resolved.
defer mu.Unlock()

for _, s := range websockets {
encodedUserId, _ := misc.EncodeHashId(int(s.userId))

data := struct {
RPC json.RawMessage `json:"rpc"`

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this has the side effect of revealing that a blast is not a real DM for sleuthing eyes. I think this is fine given our strategy though.

Metadata schema.Metadata `json:"metadata"`
}{
rpcJson,
schema.Metadata{Timestamp: timestamp.Format(time.RFC3339Nano),
// Note this is the userId of the user receiving the message
UserID: encodedUserId},
}

payload, err := json.Marshal(data)
if err != nil {
logger.Warn("invalid websocket json " + err.Error())
return
}

err = wsutil.WriteServerMessage(s.conn, ws.OpText, payload)
if err != nil {
logger.Info("websocket push failed: " + err.Error())
removeWebsocket(s)
} else {
logger.Debug("websocket push all", "payload", string(payload))
}
}
}
10 changes: 10 additions & 0 deletions packages/common/src/store/pages/chat/sagas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ function* doFetchLatestChats() {
let hasMoreChats = true
let data: UserChat[] = []
let firstResponse: TypedCommsResponse<UserChat[]> | undefined
const currentUserId = yield* select(getUserId)
if (!currentUserId) {
throw new Error('User not found')
}
while (hasMoreChats) {
const response = yield* call([sdk.chats, sdk.chats.getAll], {
userId: encodeHashId(currentUserId)!,
before,
after: summary?.next_cursor,
limit: CHAT_PAGE_SIZE
Expand Down Expand Up @@ -175,7 +180,12 @@ function* doFetchMoreChats() {
const sdk = yield* call(audiusSdk)
const summary = yield* select(getChatsSummary)
const before = summary?.prev_cursor
const currentUserId = yield* select(getUserId)
if (!currentUserId) {
throw new Error('User not found')
}
const response = yield* call([sdk.chats, sdk.chats.getAll], {
userId: encodeHashId(currentUserId)!,
before,
limit: CHAT_PAGE_SIZE
})
Expand Down
70 changes: 61 additions & 9 deletions packages/libs/src/sdk/api/chats/ChatsApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { AuthService } from '../../services/Auth'
import type { DiscoveryNodeSelectorService } from '../../services/DiscoveryNodeSelector/types'
import type { LoggerService } from '../../services/Logger'
import type { EventEmitterTarget } from '../../utils/EventEmitterTarget'
import { encodeHashId } from '../../utils/hashId'
import { parseParams } from '../../utils/parseParams'
import {
BaseAPI,
Expand Down Expand Up @@ -64,10 +65,12 @@ import type {
ChatMessage,
ChatWebsocketEventData,
RPCPayloadRequest,
ValidatedChatPermissions
ValidatedChatPermissions,
ChatBlast,
ChatCreateRPC
} from './serverTypes'

const GENERIC_MESSAGE_ERROR = 'Error: this message can not be displayed'
const GENERIC_MESSAGE_ERROR = 'Error: this message cannot be displayed'

export class ChatsApi
extends BaseAPI
Expand Down Expand Up @@ -173,14 +176,18 @@ export class ChatsApi
* @param params.limit the max number of chats to get
* @param params.before a timestamp cursor for pagination
* @param params.after a timestamp cursor for pagination
* @param params.currentUserId the user to act on behalf of
* @param params.userId the user to act on behalf of
* @returns the chat list response
*/
public async getAll(params?: ChatGetAllRequest) {
const { currentUserId, limit, before, after } = await parseParams(
const { userId, limit, before, after } = await parseParams(
'getAll',
ChatGetAllRequestSchema
)(params)

// Get new blasts and upgrade them to chats
this.upgradeBlasts(userId)

const path = `/comms/chats`
const query: HTTPQuery = {
timestamp: new Date().getTime()
Expand All @@ -194,8 +201,8 @@ export class ChatsApi
if (after) {
query.after = after
}
if (currentUserId) {
query.current_user_id = currentUserId
if (userId) {
query.current_user_id = userId
}
const response = await this.signAndSendRequest({
method: 'GET',
Expand Down Expand Up @@ -282,6 +289,23 @@ export class ChatsApi
}
}

/**
* Gets a list of chat blasts for which chats haven't been created yet
* @returns the blast messages list response
*/
public async getBlasts(): Promise<TypedCommsResponse<ChatBlast[]>> {
Comment thread
dharit-tan marked this conversation as resolved.
const query: HTTPQuery = {
timestamp: new Date().getTime()
}
const res = await this.signAndSendRequest({
method: 'GET',
path: `/comms/blasts`,
headers: {},
query
})
return (await res.json()) as TypedCommsResponse<ChatBlast[]>
}

/**
* Gets the total unread message count for a user
* @param params.currentUserId the user to act on behalf of
Expand Down Expand Up @@ -418,7 +442,7 @@ export class ChatsApi
* @param params.currentUserId the user to act on behalf of
* @returns the rpc object
*/
public async create(params: ChatCreateRequest) {
public async create(params: ChatCreateRequest): Promise<ChatCreateRPC> {
Comment thread
dharit-tan marked this conversation as resolved.
const { currentUserId, userId, invitedUserIds } = await parseParams(
'create',
ChatCreateRequestSchema
Expand All @@ -428,14 +452,14 @@ export class ChatsApi
const chatSecret = secp.utils.randomPrivateKey()
const invites = await this.createInvites(userId, invitedUserIds, chatSecret)

return await this.sendRpc({
return (await this.sendRpc({
current_user_id: currentUserId,
method: 'chat.create',
params: {
chat_id: chatId,
invites
}
})
})) as ChatCreateRPC
}

/**
Expand Down Expand Up @@ -767,6 +791,31 @@ export class ChatsApi
return base64.decode(json.data)
}

private async upgradeBlasts(userId: string) {
const blasts = await this.getBlasts()
Promise.all(
blasts.data.map(async (blast) => {
const encodedSenderId = encodeHashId(blast.from_user_id)
if (encodedSenderId) {
await this.create({
userId,
invitedUserIds: [encodedSenderId]
})
this.eventEmitter.emit('message', {
chatId: blast.pending_chat_id,
message: {
message_id: blast.pending_chat_id + blast.chat_id,
message: blast.plaintext,
sender_user_id: encodedSenderId,
created_at: blast.created_at,
reactions: []
}
})
}
})
)
}

private async getSignatureHeader(payload: string) {
const [allSignatureBytes, recoveryByte] = await this.auth.sign(payload)
const signatureBytes = new Uint8Array(65)
Expand Down Expand Up @@ -853,6 +902,9 @@ export class ChatsApi
created_at: data.metadata.timestamp
}
})
} else if (data.rpc.method === 'chat.blast') {
const userId = data.metadata.userId
await this.upgradeBlasts(userId)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait can't we use this.listenerUserId here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh man... it was not passed in listen() call in the middleware. i could set that there and then we wouldn't need to append the receiver user id to the rpc metadata. i was wondering why it was still undefined!!

but that means the chats api is more stateful... thoughts? i could go either way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhh right right - forgot the websockets were associated via signature recovery.

I think either way is fine! Though I wonder if we should remove this.listenerUserId if it's not being used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah.... i kinda like the stateless idea. i'll give that a whirl and maybe remove listenUserId in a follow-up PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so turns out my assumption about the user id attached to the metadata is wrong. so i'm going to merge what i have for now since it works and isn't n^2, and we can talk about refactoring later. a good solution maybe would be to add both senderUserId and receiverUserId to the metadata and then we can use whichever one on the client we need.

}
}
handleAsync()
Expand Down
2 changes: 1 addition & 1 deletion packages/libs/src/sdk/api/chats/clientTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const ChatListenRequestSchema = z.optional(
export type ChatListenRequest = z.infer<typeof ChatListenRequestSchema>

export const ChatGetAllRequestSchema = z.object({
currentUserId: z.optional(z.string()),
userId: z.string(),
limit: z.optional(z.number()),
before: z.optional(z.string()),
after: z.optional(z.string())
Expand Down
10 changes: 10 additions & 0 deletions packages/libs/src/sdk/api/chats/serverTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ export type ChatInvite = {
invite_code: string
}

export type ChatBlast = {
chat_id: string
pending_chat_id: string
from_user_id: number
plaintext: string
created_at: string
audience: ChatBlastAudience
audience_track_id?: number
}

export type ValidatedChatPermissions = {
user_id: string
permits: ChatPermission
Expand Down