Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/grpc/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { encodeTables } from '../schema/table.js';
export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}
export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {}
export class WriteRequest extends pluginV3.cloudquery.plugin.v3.Write.Request {}
export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {}

export type SyncStream = grpc.ServerWritableStream<
Expand Down
125 changes: 123 additions & 2 deletions src/memdb/memdb.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import { StructRowProxy } from '@apache-arrow/esnext-esm';
import { pluginV3 } from '@cloudquery/plugin-pb-javascript';

import { WriteRequest, WriteStream } from '../grpc/plugin.js';
import {
Plugin,
newUnimplementedDestination,
Expand All @@ -7,20 +11,87 @@ import {
NewClientOptions,
} from '../plugin/plugin.js';
import { sync } from '../scheduler/scheduler.js';
import { Table, createTable, filterTables } from '../schema/table.js';
import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js';

export const createMemDBClient = () => {
return { id: () => 'memdb' };
//eslint-disable-next-line @typescript-eslint/no-explicit-any
const memoryDB: Record<string, any[]> = {};
const tables: Record<string, Table> = {};
return {
id: () => 'memdb',
memoryDB,
tables,
};
};

export const newMemDBPlugin = (): Plugin => {
const memdbClient = createMemDBClient();
const memoryDB = memdbClient.memoryDB;
const tables = memdbClient.tables;

const allTables: Table[] = [
createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }),
createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }),
];

const memdb: { inserts: unknown[]; [key: string]: unknown } = {
inserts: [],
...memoryDB,
};

//eslint-disable-next-line @typescript-eslint/no-explicit-any
const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => {
const tableData = memoryDB[table.name] || [];

if (primaryKeys.length === 0) {
// If there are no primary keys, simply append the data
tableData.push(record);
memoryDB[table.name] = tableData;
return;
}

// Otherwise, perform an upsert based on the primary keys
const recordIndex = tableData.findIndex((existingRecord) => {
return primaryKeys.every((key) => existingRecord[key] === record[key]);
});

if (recordIndex > -1) {
// If record exists, update (overwrite) it
tableData[recordIndex] = record;
} else {
// If record doesn't exist, insert it
tableData.push(record);
}

memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data
};

const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => {
const tableName = message.table_name;

// Filter the table based on the provided criteria
const filteredTable = memoryDB[tableName].filter((row) => {
const sc = row.Schema();

const sourceColIndex = sc.FieldIndices('source_name_column');
const syncColIndex = sc.FieldIndices('sync_time_column');

// Ensure both columns are present
if (sourceColIndex === undefined || syncColIndex === undefined) {
return true; // Keep the record if either column is missing
}

const rowSourceName = row.Column(sourceColIndex).Value(0);
const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object

// If source names match and the record's sync time is not before the given sync time, keep the record
return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time);
});

// Update the memory database with the filtered table
memoryDB[tableName] = filteredTable;
};

const pluginClient = {
...newUnimplementedDestination(),
init: (spec: string, options: NewClientOptions) => Promise.resolve(),
Expand All @@ -35,6 +106,56 @@ export const newMemDBPlugin = (): Plugin => {
const filtered = filterTables(allTables, tables, skipTables, skipDependentTables);
return await sync(memdbClient, filtered, stream, { deterministicCQId });
},
write(stream: WriteStream): Promise<void> {
return new Promise((resolve, reject) => {
stream.on('data', (request: WriteRequest) => {
switch (request.message) {
case 'migrate_table': {
// Update table schema in the `tables` map
const table = decodeTable(request.migrate_table.table);
tables[table.name] = table;
break;
}

case 'insert': {
const [tableName, batches] = decodeRecord(request.insert.record);

if (!memoryDB[tableName]) {
memoryDB[tableName] = [];
}

const tableSchema = tables[tableName];
const pks = getPrimaryKeys(tableSchema);

for (const batch of batches) {
//eslint-disable-next-line unicorn/no-array-for-each
for (const record of batch) {
overwrite(tableSchema, pks, record);
}
}
break;
}

case 'delete': {
deleteStale(request.delete);
break;
}

default: {
throw new Error(`Unknown request message type: ${request.message}`);
}
}
});

stream.on('finish', () => {
resolve();
});

stream.on('error', (error) => {
reject(error);
});
});
},
};

return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient));
Expand Down
36 changes: 31 additions & 5 deletions src/schema/table.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Writable } from 'node:stream';

import { Table as ArrowTable, tableToIPC, Schema } from '@apache-arrow/esnext-esm';
import { Table as ArrowTable, tableFromIPC, tableToIPC, Schema, RecordBatch } from '@apache-arrow/esnext-esm';
import { isMatch } from 'matcher';

import * as arrow from './arrow.js';
import { Column, toArrowField } from './column.js';
import { Column, fromArrowField, toArrowField } from './column.js';
import { ClientMeta } from './meta.js';
import { Resource } from './resource.js';
import { Nullable } from './types.js';
Expand Down Expand Up @@ -80,6 +80,10 @@ export const getTableByName = (tables: Table[], name: string): Table | undefined
}
};

export const getPrimaryKeys = (table: Table): string[] => {
return table.columns.filter((column) => column.primaryKey).map((column) => column.name);
};

export const flattenTables = (tables: Table[]): Table[] => {
return tables.flatMap((table) => [table, ...flattenTables(table.relations.map((c) => ({ ...c, parent: table })))]);
};
Expand Down Expand Up @@ -126,7 +130,7 @@ export const filterTables = (
return withSkipDependant;
};

export const toArrowSchema = (table: Table) => {
export const toArrowSchema = (table: Table): Schema => {
const metadata = new Map<string, string>();
metadata.set(arrow.METADATA_TABLE_NAME, table.name);
metadata.set(arrow.METADATA_TABLE_DESCRIPTION, table.description);
Expand All @@ -142,13 +146,35 @@ export const toArrowSchema = (table: Table) => {
return new Schema(fields, metadata);
};

export const fromArrowSchema = (schema: Schema): Table => {
return createTable({
name: schema.metadata.get(arrow.METADATA_TABLE_NAME) || '',
title: schema.metadata.get(arrow.METADATA_TABLE_TITLE) || '',
description: schema.metadata.get(arrow.METADATA_TABLE_DESCRIPTION) || '',
pkConstraintName: schema.metadata.get(arrow.METADATA_CONSTRAINT_NAME) || '',
isIncremental: schema.metadata.get(arrow.METADATA_INCREMENTAL) === arrow.METADATA_TRUE,
// dependencies: schema.metadata.get(arrow.METADATA_TABLE_DEPENDS_ON) || '',
columns: schema.fields.map((f) => fromArrowField(f)),
});
};

export const encodeTable = (table: Table): Uint8Array => {
const schema = toArrowSchema(table);
const arrowTable = new ArrowTable(schema);
const bytes = tableToIPC(arrowTable);
return bytes;
return tableToIPC(arrowTable);
};

export const encodeTables = (tables: Table[]): Uint8Array[] => {
return tables.map((table) => encodeTable(table));
};

export const decodeTable = (bytes: Uint8Array): Table => {
const arrowTable = tableFromIPC(bytes);
return fromArrowSchema(arrowTable.schema);
};

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const decodeRecord = (bytes: Uint8Array): [string, RecordBatch<any>[]] => {
const arrowTable = tableFromIPC(bytes);
return [(arrowTable.schema.metadata.get(arrow.METADATA_TABLE_NAME) || '')!, arrowTable.batches];
};