diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index 5340141..cc099c4 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -2,7 +2,7 @@ import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; import grpc = require('@grpc/grpc-js'); import { Plugin } from '../plugin/plugin.js'; -import { encode as encodeTables } from '../schema/table.js'; +import { encodeTables } from '../schema/table.js'; export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {} export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {} diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index 345795c..bf88476 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -1,17 +1,17 @@ import { SyncStream, SyncResponse, MigrateTable } from '../grpc/plugin.js'; import { ClientMeta } from '../schema/meta.js'; -import { Table } from '../schema/table.js'; +import { Table, encodeTable } from '../schema/table.js'; export type Options = { deterministicCQId: boolean; }; export const sync = async (client: ClientMeta, tables: Table[], stream: SyncStream, options: Options) => { - for (const { name } of tables) { - const table = new TextEncoder().encode(name); + for (const table of tables) { // eslint-disable-next-line @typescript-eslint/naming-convention - stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table }) })); + stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) })); } + stream.end(); return await Promise.resolve(); }; diff --git a/src/schema/table.ts b/src/schema/table.ts index 7128397..d8e2da4 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -142,9 +142,13 @@ export const toArrowSchema = (table: Table) => { return new Schema(fields, metadata); }; -export const encode = (tables: Table[]): Uint8Array[] => { - const schemas = tables.map((table) => toArrowSchema(table)); - const arrowTables = schemas.map((schema) => new ArrowTable(schema)); - const bytes = arrowTables.map((table) => tableToIPC(table)); +export const encodeTable = (table: Table): Uint8Array => { + const schema = toArrowSchema(table); + const arrowTable = new ArrowTable(schema); + const bytes = tableToIPC(arrowTable); return bytes; }; + +export const encodeTables = (tables: Table[]): Uint8Array[] => { + return tables.map((table) => encodeTable(table)); +};