11package edu .uci .ics .amber .core .storage .result
22
3+ import edu .uci .ics .amber .core .storage .result .ArrowFileDocumentSpec .{stringDeserializer , stringSerializer }
34import edu .uci .ics .amber .core .storage .result .PartitionedFileDocument .getPartitionURI
5+ import org .apache .arrow .vector .types .pojo .{ArrowType , Field , Schema }
6+ import org .apache .arrow .vector .{VarCharVector , VectorSchemaRoot }
47import org .apache .commons .vfs2 .{FileObject , VFS }
58import org .scalatest .flatspec .AnyFlatSpec
69import org .scalatest .BeforeAndAfter
710import org .scalatest .concurrent .ScalaFutures .convertScalaFuture
811import org .scalatest .matchers .should .Matchers
912
13+ import scala .jdk .CollectionConverters .IterableHasAsJava
14+
15+ object ArrowFileDocumentSpec {
16+ def stringSerializer (item : String , index : Int , root : VectorSchemaRoot ): Unit = {
17+ val vector = root.getVector(" data" ).asInstanceOf [VarCharVector ]
18+ vector.setSafe(index, item.getBytes(" UTF-8" ))
19+ }
20+
21+ def stringDeserializer (index : Int , root : VectorSchemaRoot ): String = {
22+ new String (root.getVector(" data" ).asInstanceOf [VarCharVector ].get(index))
23+ }
24+ }
25+
26+
1027class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeAndAfter {
1128
29+ val stringArrowSchema = new Schema (List (
30+ Field .nullablePrimitive(" data" , ArrowType .Utf8 .INSTANCE )
31+ ).asJava)
32+
1233 var partitionDocument : PartitionedFileDocument [ArrowFileDocument [String ], String ] = _
1334 val numOfPartitions = 3
1435 val partitionId : String = " partition_doc_test"
@@ -18,7 +39,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA
1839 partitionDocument = new PartitionedFileDocument [ArrowFileDocument [String ], String ](
1940 partitionId,
2041 numOfPartitions,
21- uri => new ArrowFileDocument [String ](uri)
42+ uri => new ArrowFileDocument [String ](uri, stringArrowSchema, stringSerializer, stringDeserializer )
2243 )
2344 }
2445
@@ -30,7 +51,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA
3051 " PartitionDocument" should " create and write to each partition directly" in {
3152 for (i <- 0 until numOfPartitions) {
3253 val partitionURI = getPartitionURI(partitionId, i)
33- val fileDoc = new ArrowFileDocument [String ](partitionURI)
54+ val fileDoc = new ArrowFileDocument [String ](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer )
3455 fileDoc.open()
3556 fileDoc.putOne(s " Data for partition $i" )
3657 fileDoc.close()
@@ -46,7 +67,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA
4667 // Write some data directly to each partition
4768 for (i <- 0 until numOfPartitions) {
4869 val partitionURI = getPartitionURI(partitionId, i)
49- val fileDoc = new ArrowFileDocument [String ](partitionURI)
70+ val fileDoc = new ArrowFileDocument [String ](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer )
5071 fileDoc.open()
5172 fileDoc.putOne(s " Content in partition $i" )
5273 fileDoc.close()
@@ -63,7 +84,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA
6384 // Write some data directly to each partition
6485 for (i <- 0 until numOfPartitions) {
6586 val partitionURI = getPartitionURI(partitionId, i)
66- val fileDoc = new ArrowFileDocument [String ](partitionURI)
87+ val fileDoc = new ArrowFileDocument [String ](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer )
6788 fileDoc.open()
6889 fileDoc.putOne(s " Some data in partition $i" )
6990 fileDoc.close()
@@ -86,7 +107,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA
86107 val futures = (0 until numOfPartitions).map { i =>
87108 Future {
88109 val partitionURI = getPartitionURI(partitionId, i)
89- val fileDoc = new ArrowFileDocument [String ](partitionURI)
110+ val fileDoc = new ArrowFileDocument [String ](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer )
90111 fileDoc.open()
91112 fileDoc.putOne(s " Concurrent write to partition $i" )
92113 fileDoc.close()
0 commit comments