From 6901d44db7871f2666a88d7870552beeb2302abc Mon Sep 17 00:00:00 2001 From: erezrokah Date: Mon, 21 Aug 2023 17:38:22 +0200 Subject: [PATCH 1/2] feat(sync): Send migrate messsages --- .../servers/plugin/v3/PluginServer.java | 39 ++++++++----------- .../main/java/io/cloudquery/memdb/MemDB.java | 22 +++++++++-- .../io/cloudquery/plugin/BackendOptions.java | 11 ++++++ .../java/io/cloudquery/plugin/Plugin.java | 10 ++++- .../io/cloudquery/scheduler/Scheduler.java | 31 ++++++++++++++- .../main/java/io/cloudquery/schema/Table.java | 23 +++++++++++ .../io/cloudquery/server/ServeCommand.java | 1 + 7 files changed, 110 insertions(+), 27 deletions(-) create mode 100644 lib/src/main/java/io/cloudquery/plugin/BackendOptions.java diff --git a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java index 8b665a10..81cccc8e 100644 --- a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java +++ b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java @@ -1,20 +1,14 @@ package io.cloudquery.internal.servers.plugin.v3; import com.google.protobuf.ByteString; +import io.cloudquery.plugin.BackendOptions; import io.cloudquery.plugin.Plugin; import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase; import io.cloudquery.plugin.v3.Write; import io.cloudquery.schema.Table; import io.grpc.stub.StreamObserver; -import java.io.ByteArrayOutputStream; -import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.ipc.ArrowStreamWriter; -import org.apache.arrow.vector.types.pojo.Schema; public class PluginServer extends PluginImplBase { private final Plugin plugin; @@ -64,18 +58,7 @@ public void getTables( request.getSkipDependentTables()); List byteStrings = new ArrayList<>(); for (Table table : tables) { - try (BufferAllocator bufferAllocator = new RootAllocator()) { - Schema schema = table.toArrowSchema(); - VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - try (ArrowStreamWriter writer = - new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) { - writer.start(); - writer.end(); - byteStrings.add(ByteString.copyFrom(out.toByteArray())); - } - } - } + byteStrings.add(table.encode()); } responseObserver.onNext( io.cloudquery.plugin.v3.GetTables.Response.newBuilder() @@ -91,9 +74,21 @@ public void getTables( public void sync( io.cloudquery.plugin.v3.Sync.Request request, StreamObserver responseObserver) { - plugin.sync(); - responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build()); - responseObserver.onCompleted(); + try { + plugin.sync( + request.getTablesList(), + request.getSkipTablesList(), + request.getSkipDependentTables(), + request.getDeterministicCqId(), + new BackendOptions( + request.getBackend().getTableName(), request.getBackend().getConnection()), + responseObserver); + + responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build()); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(e); + } } @Override diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java index be3d2fde..a4f08f9d 100644 --- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java +++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java @@ -1,9 +1,12 @@ package io.cloudquery.memdb; +import io.cloudquery.plugin.BackendOptions; import io.cloudquery.plugin.Plugin; +import io.cloudquery.scheduler.Scheduler; import io.cloudquery.schema.Column; import io.cloudquery.schema.SchemaException; import io.cloudquery.schema.Table; +import io.grpc.stub.StreamObserver; import java.util.List; import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; @@ -36,9 +39,22 @@ public List tables( } @Override - public void sync() { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'Sync'"); + public void sync( + List includeList, + List skipList, + boolean skipDependentTables, + boolean deterministicCqId, + BackendOptions backendOptions, + StreamObserver syncStream) + throws SchemaException { + List
filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables); + Scheduler.builder() + .tables(filtered) + .syncStream(syncStream) + .deterministicCqId(deterministicCqId) + .logger(getLogger()) + .build() + .sync(); } @Override diff --git a/lib/src/main/java/io/cloudquery/plugin/BackendOptions.java b/lib/src/main/java/io/cloudquery/plugin/BackendOptions.java new file mode 100644 index 00000000..d5ebdab3 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/plugin/BackendOptions.java @@ -0,0 +1,11 @@ +package io.cloudquery.plugin; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public class BackendOptions { + private final String tableName; + private final String connection; +} diff --git a/lib/src/main/java/io/cloudquery/plugin/Plugin.java b/lib/src/main/java/io/cloudquery/plugin/Plugin.java index 9f5cc1fc..b27fc1d5 100644 --- a/lib/src/main/java/io/cloudquery/plugin/Plugin.java +++ b/lib/src/main/java/io/cloudquery/plugin/Plugin.java @@ -2,6 +2,7 @@ import io.cloudquery.schema.SchemaException; import io.cloudquery.schema.Table; +import io.grpc.stub.StreamObserver; import java.util.List; import lombok.Getter; import lombok.NonNull; @@ -22,7 +23,14 @@ public abstract List
tables( List includeList, List skipList, boolean skipDependentTables) throws SchemaException; - public abstract void sync(); + public abstract void sync( + List includeList, + List skipList, + boolean skipDependentTables, + boolean deterministicCqId, + BackendOptions backendOptions, + StreamObserver syncStream) + throws SchemaException; public abstract void read(); diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java index be1650ad..88a725dd 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java +++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java @@ -1,5 +1,34 @@ package io.cloudquery.scheduler; +import io.cloudquery.plugin.v3.Sync; +import io.cloudquery.schema.Table; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.List; +import lombok.Builder; +import lombok.NonNull; +import org.apache.logging.log4j.Logger; + +@Builder public class Scheduler { - public Scheduler() {} + @NonNull private final List
tables; + @NonNull private final StreamObserver syncStream; + @NonNull private final Logger logger; + + private boolean deterministicCqId; + + public void sync() { + for (Table table : tables) { + try { + logger.info("sending migrate message for table: {}", table.getName()); + Sync.MessageMigrateTable migrateTable = + Sync.MessageMigrateTable.newBuilder().setTable(table.encode()).build(); + Sync.Response response = Sync.Response.newBuilder().setMigrateTable(migrateTable).build(); + syncStream.onNext(response); + } catch (IOException e) { + syncStream.onError(e); + return; + } + } + } } diff --git a/lib/src/main/java/io/cloudquery/schema/Table.java b/lib/src/main/java/io/cloudquery/schema/Table.java index d8216cdc..ca47d8ab 100644 --- a/lib/src/main/java/io/cloudquery/schema/Table.java +++ b/lib/src/main/java/io/cloudquery/schema/Table.java @@ -2,9 +2,13 @@ import static java.util.Arrays.asList; +import com.google.protobuf.ByteString; import io.cloudquery.glob.Glob; import io.cloudquery.schema.Column.ColumnBuilder; import io.cloudquery.transformers.TransformerException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -16,6 +20,10 @@ import lombok.Getter; import lombok.NonNull; import lombok.Setter; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; @@ -228,4 +236,19 @@ public Schema toArrowSchema() { Schema schema = new Schema(asList(fields), metadata); return schema; } + + public ByteString encode() throws IOException { + try (BufferAllocator bufferAllocator = new RootAllocator()) { + Schema schema = toArrowSchema(); + VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + try (ArrowStreamWriter writer = + new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) { + writer.start(); + writer.end(); + return ByteString.copyFrom(out.toByteArray()); + } + } + } + } } diff --git a/lib/src/main/java/io/cloudquery/server/ServeCommand.java b/lib/src/main/java/io/cloudquery/server/ServeCommand.java index 4802e27c..539bf9cb 100644 --- a/lib/src/main/java/io/cloudquery/server/ServeCommand.java +++ b/lib/src/main/java/io/cloudquery/server/ServeCommand.java @@ -88,6 +88,7 @@ private LoggerContext initLogger() { context.start(configuration); logger = context.getLogger(ServeCommand.class.getName()); + this.plugin.setLogger(logger); return context; } From 0172e8f29a01adf11d45d0a3c3c5a23502755b00 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Mon, 21 Aug 2023 17:41:26 +0200 Subject: [PATCH 2/2] chore: Cleanup --- .../io/cloudquery/internal/servers/plugin/v3/PluginServer.java | 3 --- lib/src/main/java/io/cloudquery/scheduler/Scheduler.java | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java index 81cccc8e..51b2796c 100644 --- a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java +++ b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java @@ -83,9 +83,6 @@ public void sync( new BackendOptions( request.getBackend().getTableName(), request.getBackend().getConnection()), responseObserver); - - responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build()); - responseObserver.onCompleted(); } catch (Exception e) { responseObserver.onError(e); } diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java index 88a725dd..7d484f6b 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java +++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java @@ -30,5 +30,6 @@ public void sync() { return; } } + syncStream.onCompleted(); } }