diff --git a/src/ipc/metadata/message.ts b/src/ipc/metadata/message.ts index b41ec4a5..e810af3d 100644 --- a/src/ipc/metadata/message.ts +++ b/src/ipc/metadata/message.ts @@ -82,7 +82,8 @@ export class Message { const bodyLength: bigint = _message.bodyLength()!; const version: MetadataVersion = _message.version(); const headerType: MessageHeader = _message.headerType(); - const message = new Message(bodyLength, version, headerType); + const metadata = decodeMessageCustomMetadata(_message); + const message = new Message(bodyLength, version, headerType, undefined, metadata); message._createHeader = decodeMessageHeader(_message, headerType); return message; } @@ -98,22 +99,35 @@ export class Message { } else if (message.isDictionaryBatch()) { headerOffset = DictionaryBatch.encode(b, message.header() as DictionaryBatch); } + + // Encode custom metadata if present (must be done before startMessage) + const customMetadataOffset = !(message.metadata && message.metadata.size > 0) ? -1 : + _Message.createCustomMetadataVector(b, [...message.metadata].map(([k, v]) => { + const key = b.createString(`${k}`); + const val = b.createString(`${v}`); + _KeyValue.startKeyValue(b); + _KeyValue.addKey(b, key); + _KeyValue.addValue(b, val); + return _KeyValue.endKeyValue(b); + })); + _Message.startMessage(b); _Message.addVersion(b, MetadataVersion.V5); _Message.addHeader(b, headerOffset); _Message.addHeaderType(b, message.headerType); _Message.addBodyLength(b, BigInt(message.bodyLength)); + if (customMetadataOffset !== -1) { _Message.addCustomMetadata(b, customMetadataOffset); } _Message.finishMessageBuffer(b, _Message.endMessage(b)); return b.asUint8Array(); } /** @nocollapse */ - public static from(header: Schema | RecordBatch | DictionaryBatch, bodyLength = 0) { + public static from(header: Schema | RecordBatch | DictionaryBatch, bodyLength = 0, metadata?: Map) { if (header instanceof Schema) { return new Message(0, MetadataVersion.V5, MessageHeader.Schema, header); } if (header instanceof RecordBatch) { - return new Message(bodyLength, MetadataVersion.V5, MessageHeader.RecordBatch, header); + return new Message(bodyLength, MetadataVersion.V5, MessageHeader.RecordBatch, header, metadata); } if (header instanceof DictionaryBatch) { return new Message(bodyLength, MetadataVersion.V5, MessageHeader.DictionaryBatch, header); @@ -126,24 +140,27 @@ export class Message { protected _bodyLength: number; protected _version: MetadataVersion; protected _compression: BodyCompression | null; + protected _metadata: Map; public get type() { return this.headerType; } public get version() { return this._version; } public get headerType() { return this._headerType; } public get compression() { return this._compression; } public get bodyLength() { return this._bodyLength; } + public get metadata() { return this._metadata; } declare protected _createHeader: MessageHeaderDecoder; public header() { return this._createHeader(); } public isSchema(): this is Message { return this.headerType === MessageHeader.Schema; } public isRecordBatch(): this is Message { return this.headerType === MessageHeader.RecordBatch; } public isDictionaryBatch(): this is Message { return this.headerType === MessageHeader.DictionaryBatch; } - constructor(bodyLength: bigint | number, version: MetadataVersion, headerType: T, header?: any) { + constructor(bodyLength: bigint | number, version: MetadataVersion, headerType: T, header?: any, metadata?: Map) { this._version = version; this._headerType = headerType; this.body = new Uint8Array(0); this._compression = header?.compression; header && (this._createHeader = () => header); this._bodyLength = bigIntToNumber(bodyLength); + this._metadata = metadata || new Map(); } } @@ -468,6 +485,17 @@ function decodeCustomMetadata(parent?: _Schema | _Field | null) { return data; } +/** @ignore */ +function decodeMessageCustomMetadata(message: _Message) { + const data = new Map(); + for (let entry, key, i = -1, n = Math.trunc(message.customMetadataLength()); ++i < n;) { + if ((entry = message.customMetadata(i)) && (key = entry.key()) != null) { + data.set(key, entry.value()!); + } + } + return data; +} + /** @ignore */ function decodeIndexType(_type: _Int) { return new Int(_type.isSigned(), _type.bitWidth() as IntBitWidth); diff --git a/src/ipc/reader.ts b/src/ipc/reader.ts index af49f372..d0edafd1 100644 --- a/src/ipc/reader.ts +++ b/src/ipc/reader.ts @@ -358,7 +358,7 @@ abstract class RecordBatchReaderImpl implements RecordB return this; } - protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array): RecordBatch { + protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array, messageMetadata?: Map): RecordBatch { let children: Data[]; if (header.compression != null) { const codec = compressionRegistry.get(header.compression.type); @@ -379,7 +379,7 @@ abstract class RecordBatchReaderImpl implements RecordB } const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children }); - return new RecordBatch(this.schema, data); + return new RecordBatch(this.schema, data, messageMetadata); } protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: Uint8Array) { @@ -512,7 +512,7 @@ class RecordBatchStreamReaderImpl extends RecordBatchRe this._recordBatchIndex++; const header = message.header(); const buffer = reader.readMessageBody(message.bodyLength); - const recordBatch = this._loadRecordBatch(header, buffer); + const recordBatch = this._loadRecordBatch(header, buffer, message.metadata); return { done: false, value: recordBatch }; } else if (message.isDictionaryBatch()) { this._dictionaryIndex++; @@ -587,7 +587,7 @@ class AsyncRecordBatchStreamReaderImpl extends RecordBa this._recordBatchIndex++; const header = message.header(); const buffer = await reader.readMessageBody(message.bodyLength); - const recordBatch = this._loadRecordBatch(header, buffer); + const recordBatch = this._loadRecordBatch(header, buffer, message.metadata); return { done: false, value: recordBatch }; } else if (message.isDictionaryBatch()) { this._dictionaryIndex++; @@ -640,7 +640,7 @@ class RecordBatchFileReaderImpl extends RecordBatchStre if (message?.isRecordBatch()) { const header = message.header(); const buffer = this._reader.readMessageBody(message.bodyLength); - const recordBatch = this._loadRecordBatch(header, buffer); + const recordBatch = this._loadRecordBatch(header, buffer, message.metadata); return recordBatch; } } @@ -714,7 +714,7 @@ class AsyncRecordBatchFileReaderImpl extends AsyncRecor if (message?.isRecordBatch()) { const header = message.header(); const buffer = await this._reader.readMessageBody(message.bodyLength); - const recordBatch = this._loadRecordBatch(header, buffer); + const recordBatch = this._loadRecordBatch(header, buffer, message.metadata); return recordBatch; } } diff --git a/src/ipc/writer.ts b/src/ipc/writer.ts index 0b13fdfc..f82831f5 100644 --- a/src/ipc/writer.ts +++ b/src/ipc/writer.ts @@ -185,7 +185,18 @@ export class RecordBatchWriter extends ReadableInterop< return this; } - public write(payload?: Table | RecordBatch | Iterable> | null) { + /** + * Write a RecordBatch to the stream with optional custom metadata. + * @param payload The RecordBatch, Table, or iterable of RecordBatches to write + * @param customMetadata Optional custom metadata to attach to the message (only used when payload is a single RecordBatch) + */ + public write(payload?: Table | RecordBatch | Iterable> | null, customMetadata?: Map): void; + // Overload for UnderlyingSink compatibility (used by DOM streams) + public write(chunk: RecordBatch, controller: WritableStreamDefaultController): void; + public write(payload?: Table | RecordBatch | Iterable> | null, customMetadataOrController?: Map | WritableStreamDefaultController) { + // Determine if second argument is customMetadata (Map) or controller (WritableStreamDefaultController) + const customMetadata = customMetadataOrController instanceof Map ? customMetadataOrController : undefined; + let schema: Schema | null = null; if (!this._sink) { @@ -207,7 +218,7 @@ export class RecordBatchWriter extends ReadableInterop< if (payload instanceof RecordBatch) { if (!(payload instanceof _InternalEmptyPlaceholderRecordBatch)) { - this._writeRecordBatch(payload); + this._writeRecordBatch(payload, customMetadata); } } else if (payload instanceof Table) { this.writeAll(payload.batches); @@ -273,10 +284,12 @@ export class RecordBatchWriter extends ReadableInterop< return nBytes > 0 ? this._write(new Uint8Array(nBytes)) : this; } - protected _writeRecordBatch(batch: RecordBatch) { + protected _writeRecordBatch(batch: RecordBatch, customMetadata?: Map) { const { byteLength, nodes, bufferRegions, buffers, variadicBufferCounts } = this._assembleRecordBatch(batch); const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions, this._compression, variadicBufferCounts); - const message = Message.from(recordBatch, byteLength); + // Merge batch.metadata with customMetadata (customMetadata takes precedence) + const mergedMetadata = mergeMetadata(batch.metadata, customMetadata); + const message = Message.from(recordBatch, byteLength, mergedMetadata); return this ._writeDictionaries(batch) ._writeMessage(message) @@ -589,3 +602,13 @@ function recordBatchToJSON(records: RecordBatch) { 'columns': columns }, null, 2); } + +/** @ignore */ +function mergeMetadata(base?: Map, override?: Map): Map | undefined { + if (!base?.size && !override?.size) { return undefined; } + const merged = new Map(base); + if (override) { + for (const [k, v] of override) { merged.set(k, v); } + } + return merged; +} diff --git a/src/recordbatch.ts b/src/recordbatch.ts index 33dbe9e5..62f7ee4c 100644 --- a/src/recordbatch.ts +++ b/src/recordbatch.ts @@ -47,9 +47,10 @@ export interface RecordBatch { export class RecordBatch { constructor(columns: { [P in keyof T]: Data }); - constructor(schema: Schema, data?: Data>); + constructor(schema: Schema, data?: Data>, metadata?: Map); constructor(...args: any[]) { switch (args.length) { + case 3: case 2: { [this.schema] = args; if (!(this.schema instanceof Schema)) { @@ -60,7 +61,8 @@ export class RecordBatch { nullCount: 0, type: new Struct(this.schema.fields), children: this.schema.fields.map((f) => makeData({ type: f.type, nullCount: 0 })) - }) + }), + this._metadata = new Map() ] = args; if (!(this.data instanceof Data)) { throw new TypeError('RecordBatch constructor expects a [Schema, Data] pair.'); @@ -84,6 +86,7 @@ export class RecordBatch { const schema = new Schema(fields); const data = makeData({ type: new Struct(fields), length, children, nullCount: 0 }); [this.schema, this.data] = ensureSameLengthData(schema, data.children as Data[], length); + this._metadata = new Map(); break; } default: throw new TypeError('RecordBatch constructor expects an Object mapping names to child Data, or a [Schema, Data] pair.'); @@ -91,10 +94,16 @@ export class RecordBatch { } protected _dictionaries?: Map; + protected _metadata: Map; public readonly schema: Schema; public readonly data: Data>; + /** + * Custom metadata for this RecordBatch. + */ + public get metadata() { return this._metadata; } + public get dictionaries() { return this._dictionaries || (this._dictionaries = collectDictionaries(this.schema.fields, this.data.children)); } @@ -188,7 +197,7 @@ export class RecordBatch { */ public slice(begin?: number, end?: number): RecordBatch { const [slice] = new Vector([this.data]).slice(begin, end).data; - return new RecordBatch(this.schema, slice); + return new RecordBatch(this.schema, slice, this._metadata); } /** @@ -240,7 +249,7 @@ export class RecordBatch { schema = new Schema(fields, new Map(this.schema.metadata)); data = makeData({ type: new Struct(fields), children }); } - return new RecordBatch(schema, data); + return new RecordBatch(schema, data, this._metadata); } /** @@ -259,7 +268,7 @@ export class RecordBatch { children[index] = this.data.children[index] as Data; } } - return new RecordBatch(schema, makeData({ type, length: this.numRows, children })); + return new RecordBatch(schema, makeData({ type, length: this.numRows, children }), this._metadata); } /** @@ -272,7 +281,7 @@ export class RecordBatch { const schema = this.schema.selectAt(columnIndices); const children = columnIndices.map((i) => this.data.children[i]).filter(Boolean); const subset = makeData({ type: new Struct(schema.fields), length: this.numRows, children }); - return new RecordBatch<{ [P in keyof K]: K[P] }>(schema, subset); + return new RecordBatch<{ [P in keyof K]: K[P] }>(schema, subset, this._metadata); } // Initialize this static property via an IIFE so bundlers don't tree-shake @@ -347,9 +356,9 @@ function collectDictionaries(fields: Field[], children: readonly Data[], diction * @private */ export class _InternalEmptyPlaceholderRecordBatch extends RecordBatch { - constructor(schema: Schema) { + constructor(schema: Schema, metadata?: Map) { const children = schema.fields.map((f) => makeData({ type: f.type })); const data = makeData({ type: new Struct(schema.fields), nullCount: 0, children }); - super(schema, data); + super(schema, data, metadata || new Map()); } } diff --git a/test/data/test_message_metadata.arrow b/test/data/test_message_metadata.arrow new file mode 100644 index 00000000..2dd9e825 Binary files /dev/null and b/test/data/test_message_metadata.arrow differ diff --git a/test/unit/ipc/reader/message-metadata-tests.ts b/test/unit/ipc/reader/message-metadata-tests.ts new file mode 100644 index 00000000..ecfc0a10 --- /dev/null +++ b/test/unit/ipc/reader/message-metadata-tests.ts @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { readFileSync } from 'node:fs'; +import path from 'node:path'; +import { tableFromIPC, RecordBatch } from 'apache-arrow'; + +// Path to the test file with message-level metadata +// Use process.cwd() since tests are run from project root +const testFilePath = path.resolve(process.cwd(), 'test/data/test_message_metadata.arrow'); + +describe('RecordBatch message metadata', () => { + const buffer = readFileSync(testFilePath); + const table = tableFromIPC(buffer); + + test('should read RecordBatch metadata from IPC file', () => { + expect(table.batches).toHaveLength(3); + + for (let i = 0; i < table.batches.length; i++) { + const batch = table.batches[i]; + expect(batch).toBeInstanceOf(RecordBatch); + expect(batch.metadata).toBeInstanceOf(Map); + expect(batch.metadata.size).toBeGreaterThan(0); + + // Verify specific metadata keys exist + expect(batch.metadata.has('batch_index')).toBe(true); + expect(batch.metadata.has('batch_id')).toBe(true); + expect(batch.metadata.has('producer')).toBe(true); + + // Verify batch_index matches the batch position + expect(batch.metadata.get('batch_index')).toBe(String(i)); + expect(batch.metadata.get('batch_id')).toBe(`batch_${String(i).padStart(4, '0')}`); + } + }); + + test('should read unicode metadata values', () => { + const batch = table.batches[0]; + expect(batch.metadata.has('unicode_test')).toBe(true); + expect(batch.metadata.get('unicode_test')).toBe('Hello 世界 🌍 مرحبا'); + }); + + test('should handle empty metadata values', () => { + const batch = table.batches[0]; + expect(batch.metadata.has('optional_field')).toBe(true); + expect(batch.metadata.get('optional_field')).toBe(''); + }); + + test('should read JSON metadata values', () => { + const batch = table.batches[0]; + expect(batch.metadata.has('batch_info_json')).toBe(true); + const jsonStr = batch.metadata.get('batch_info_json')!; + const parsed = JSON.parse(jsonStr); + expect(parsed.batch_number).toBe(0); + expect(parsed.processing_stage).toBe('final'); + expect(parsed.tags).toEqual(['validated', 'complete']); + }); + + describe('metadata preservation', () => { + test('should preserve metadata through slice()', () => { + const batch = table.batches[0]; + const sliced = batch.slice(0, 2); + expect(sliced.metadata).toBeInstanceOf(Map); + expect(sliced.metadata.size).toBe(batch.metadata.size); + expect(sliced.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index')); + }); + + test('should preserve metadata through select()', () => { + const batch = table.batches[0]; + const selected = batch.select(['id', 'name']); + expect(selected.metadata).toBeInstanceOf(Map); + expect(selected.metadata.size).toBe(batch.metadata.size); + expect(selected.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index')); + }); + + test('should preserve metadata through selectAt()', () => { + const batch = table.batches[0]; + const selectedAt = batch.selectAt([0, 1]); + expect(selectedAt.metadata).toBeInstanceOf(Map); + expect(selectedAt.metadata.size).toBe(batch.metadata.size); + expect(selectedAt.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index')); + }); + }); +}); diff --git a/test/unit/ipc/writer/message-metadata-tests.ts b/test/unit/ipc/writer/message-metadata-tests.ts new file mode 100644 index 00000000..9b71fb07 --- /dev/null +++ b/test/unit/ipc/writer/message-metadata-tests.ts @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + Field, + Int32, + makeData, + RecordBatch, + RecordBatchFileWriter, + RecordBatchStreamWriter, + Schema, + Struct, + tableFromIPC, + Utf8 +} from 'apache-arrow'; + +describe('RecordBatch message metadata writing', () => { + + // Helper to create a simple RecordBatch for testing + function createTestBatch(): RecordBatch { + const schema = new Schema([ + new Field('id', new Int32()), + new Field('name', new Utf8()) + ]); + const idData = makeData({ type: new Int32(), data: new Int32Array([1, 2, 3]) }); + const nameData = makeData({ type: new Utf8(), data: Buffer.from('foobarbaz'), valueOffsets: new Int32Array([0, 3, 6, 9]) }); + const structData = makeData({ + type: new Struct(schema.fields), + length: 3, + nullCount: 0, + children: [idData, nameData] + }); + return new RecordBatch(schema, structData); + } + + describe('Stream format round-trip', () => { + test('should write and read metadata via RecordBatchStreamWriter', () => { + const batch = createTestBatch(); + const metadata = new Map([['batch_id', '123'], ['source', 'test']]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + const buffer = writer.toUint8Array(true); + + const table = tableFromIPC(buffer); + expect(table.batches).toHaveLength(1); + expect(table.batches[0].metadata).toBeInstanceOf(Map); + expect(table.batches[0].metadata.get('batch_id')).toBe('123'); + expect(table.batches[0].metadata.get('source')).toBe('test'); + }); + + test('should write batch without metadata when none provided', () => { + const batch = createTestBatch(); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch); + writer.finish(); + const buffer = writer.toUint8Array(true); + + const table = tableFromIPC(buffer); + expect(table.batches).toHaveLength(1); + expect(table.batches[0].metadata).toBeInstanceOf(Map); + expect(table.batches[0].metadata.size).toBe(0); + }); + }); + + describe('File format round-trip', () => { + test('should write and read metadata via RecordBatchFileWriter', () => { + const batch = createTestBatch(); + const metadata = new Map([['format', 'file'], ['version', '1.0']]); + + const writer = new RecordBatchFileWriter(); + writer.write(batch, metadata); + writer.finish(); + const buffer = writer.toUint8Array(true); + + const table = tableFromIPC(buffer); + expect(table.batches).toHaveLength(1); + expect(table.batches[0].metadata.get('format')).toBe('file'); + expect(table.batches[0].metadata.get('version')).toBe('1.0'); + }); + }); + + describe('Multiple batches with different metadata', () => { + test('should write multiple batches with different metadata', () => { + const writer = new RecordBatchStreamWriter(); + + const batch1 = createTestBatch(); + const batch2 = createTestBatch(); + const batch3 = createTestBatch(); + + writer.write(batch1, new Map([['batch_index', '0'], ['tag', 'first']])); + writer.write(batch2, new Map([['batch_index', '1'], ['tag', 'middle']])); + writer.write(batch3, new Map([['batch_index', '2'], ['tag', 'last']])); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches).toHaveLength(3); + expect(table.batches[0].metadata.get('batch_index')).toBe('0'); + expect(table.batches[0].metadata.get('tag')).toBe('first'); + expect(table.batches[1].metadata.get('batch_index')).toBe('1'); + expect(table.batches[1].metadata.get('tag')).toBe('middle'); + expect(table.batches[2].metadata.get('batch_index')).toBe('2'); + expect(table.batches[2].metadata.get('tag')).toBe('last'); + }); + }); + + describe('Metadata preservation through operations', () => { + test('should preserve metadata through slice after round-trip', () => { + const batch = createTestBatch(); + const metadata = new Map([['key', 'value']]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + const sliced = table.batches[0].slice(0, 2); + + expect(sliced.metadata.get('key')).toBe('value'); + }); + + test('should preserve metadata through selectAt after round-trip', () => { + const batch = createTestBatch(); + const metadata = new Map([['preserved', 'yes']]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + const selected = table.batches[0].selectAt([0]); + + expect(selected.metadata.get('preserved')).toBe('yes'); + }); + }); + + describe('Merge behavior', () => { + test('should use customMetadata parameter when batch has no metadata', () => { + const batch = createTestBatch(); + const metadata = new Map([['from_param', 'value']]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.get('from_param')).toBe('value'); + }); + + test('should use batch.metadata when no customMetadata parameter provided', () => { + // Create a batch with existing metadata + const schema = new Schema([new Field('id', new Int32())]); + const idData = makeData({ type: new Int32(), data: new Int32Array([1, 2, 3]) }); + const structData = makeData({ + type: new Struct(schema.fields), + length: 3, + nullCount: 0, + children: [idData] + }); + const batchWithMetadata = new RecordBatch(schema, structData, new Map([['from_batch', 'value']])); + + const writer = new RecordBatchStreamWriter(); + writer.write(batchWithMetadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.get('from_batch')).toBe('value'); + }); + + test('should merge batch.metadata and customMetadata with customMetadata taking precedence', () => { + // Create a batch with existing metadata + const schema = new Schema([new Field('id', new Int32())]); + const idData = makeData({ type: new Int32(), data: new Int32Array([1, 2, 3]) }); + const structData = makeData({ + type: new Struct(schema.fields), + length: 3, + nullCount: 0, + children: [idData] + }); + const batchWithMetadata = new RecordBatch( + schema, + structData, + new Map([['key1', 'from_batch'], ['shared_key', 'batch_value']]) + ); + + const writer = new RecordBatchStreamWriter(); + writer.write(batchWithMetadata, new Map([['key2', 'from_param'], ['shared_key', 'param_value']])); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.get('key1')).toBe('from_batch'); + expect(table.batches[0].metadata.get('key2')).toBe('from_param'); + // customMetadata should override batch.metadata for shared keys + expect(table.batches[0].metadata.get('shared_key')).toBe('param_value'); + }); + }); + + describe('Edge cases', () => { + test('should handle empty metadata map', () => { + const batch = createTestBatch(); + const metadata = new Map(); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.size).toBe(0); + }); + + test('should handle unicode keys and values', () => { + const batch = createTestBatch(); + const metadata = new Map([ + ['日本語キー', 'Japanese key'], + ['emoji', '🎉🚀💻'], + ['mixed', 'Hello 世界 مرحبا'] + ]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.get('日本語キー')).toBe('Japanese key'); + expect(table.batches[0].metadata.get('emoji')).toBe('🎉🚀💻'); + expect(table.batches[0].metadata.get('mixed')).toBe('Hello 世界 مرحبا'); + }); + + test('should handle empty string values', () => { + const batch = createTestBatch(); + const metadata = new Map([['empty_value', ''], ['normal', 'value']]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.get('empty_value')).toBe(''); + expect(table.batches[0].metadata.get('normal')).toBe('value'); + }); + + test('should handle JSON string as metadata value', () => { + const batch = createTestBatch(); + const jsonValue = JSON.stringify({ nested: { data: [1, 2, 3] }, flag: true }); + const metadata = new Map([['json_data', jsonValue]]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + const retrieved = table.batches[0].metadata.get('json_data')!; + const parsed = JSON.parse(retrieved); + expect(parsed.nested.data).toEqual([1, 2, 3]); + expect(parsed.flag).toBe(true); + }); + + test('should handle long metadata values', () => { + const batch = createTestBatch(); + const longValue = 'x'.repeat(10000); + const metadata = new Map([['long_value', longValue]]); + + const writer = new RecordBatchStreamWriter(); + writer.write(batch, metadata); + writer.finish(); + + const table = tableFromIPC(writer.toUint8Array(true)); + expect(table.batches[0].metadata.get('long_value')).toBe(longValue); + }); + }); +});