Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package io.cloudquery.internal.servers.plugin.v3;

import com.google.protobuf.ByteString;
import io.cloudquery.plugin.BackendOptions;
import io.cloudquery.plugin.Plugin;
import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
import io.cloudquery.plugin.v3.Write;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;

public class PluginServer extends PluginImplBase {
private final Plugin plugin;
Expand Down Expand Up @@ -64,18 +58,7 @@ public void getTables(
request.getSkipDependentTables());
List<ByteString> byteStrings = new ArrayList<>();
for (Table table : tables) {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the the Table class under the encode method

Schema schema = table.toArrowSchema();
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
writer.start();
writer.end();
byteStrings.add(ByteString.copyFrom(out.toByteArray()));
}
}
}
byteStrings.add(table.encode());
}
responseObserver.onNext(
io.cloudquery.plugin.v3.GetTables.Response.newBuilder()
Expand All @@ -91,9 +74,18 @@ public void getTables(
public void sync(
io.cloudquery.plugin.v3.Sync.Request request,
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> responseObserver) {
plugin.sync();
responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build());
responseObserver.onCompleted();
try {
plugin.sync(
request.getTablesList(),
request.getSkipTablesList(),
request.getSkipDependentTables(),
request.getDeterministicCqId(),
new BackendOptions(
request.getBackend().getTableName(), request.getBackend().getConnection()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getBackend() doesn't return null if the backend is not configured

responseObserver);
} catch (Exception e) {
responseObserver.onError(e);
}
}

@Override
Expand Down
22 changes: 19 additions & 3 deletions lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.cloudquery.memdb;

import io.cloudquery.plugin.BackendOptions;
import io.cloudquery.plugin.Plugin;
import io.cloudquery.scheduler.Scheduler;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.SchemaException;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;

Expand Down Expand Up @@ -36,9 +39,22 @@ public List<Table> tables(
}

@Override
public void sync() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'Sync'");
public void sync(
List<String> includeList,
List<String> skipList,
boolean skipDependentTables,
boolean deterministicCqId,
BackendOptions backendOptions,
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
throws SchemaException {
List<Table> filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
Scheduler.builder()
.tables(filtered)
.syncStream(syncStream)
.deterministicCqId(deterministicCqId)
.logger(getLogger())
.build()
.sync();
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions lib/src/main/java/io/cloudquery/plugin/BackendOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.cloudquery.plugin;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public class BackendOptions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapper class so consumers don't need to mess with the gRPC wrapper

private final String tableName;
private final String connection;
}
10 changes: 9 additions & 1 deletion lib/src/main/java/io/cloudquery/plugin/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.cloudquery.schema.SchemaException;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.util.List;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -22,7 +23,14 @@ public abstract List<Table> tables(
List<String> includeList, List<String> skipList, boolean skipDependentTables)
throws SchemaException;

public abstract void sync();
public abstract void sync(
List<String> includeList,
List<String> skipList,
boolean skipDependentTables,
boolean deterministicCqId,
BackendOptions backendOptions,
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
throws SchemaException;

public abstract void read();

Expand Down
32 changes: 31 additions & 1 deletion lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
package io.cloudquery.scheduler;

import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
import lombok.Builder;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;

@Builder
public class Scheduler {
public Scheduler() {}
@NonNull private final List<Table> tables;
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
@NonNull private final Logger logger;

private boolean deterministicCqId;

public void sync() {
for (Table table : tables) {
try {
logger.info("sending migrate message for table: {}", table.getName());
Sync.MessageMigrateTable migrateTable =
Sync.MessageMigrateTable.newBuilder().setTable(table.encode()).build();
Sync.Response response = Sync.Response.newBuilder().setMigrateTable(migrateTable).build();
syncStream.onNext(response);
} catch (IOException e) {
syncStream.onError(e);
return;
}
}
syncStream.onCompleted();
}
}
23 changes: 23 additions & 0 deletions lib/src/main/java/io/cloudquery/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import static java.util.Arrays.asList;

import com.google.protobuf.ByteString;
import io.cloudquery.glob.Glob;
import io.cloudquery.schema.Column.ColumnBuilder;
import io.cloudquery.transformers.TransformerException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -16,6 +20,10 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

Expand Down Expand Up @@ -228,4 +236,19 @@ public Schema toArrowSchema() {
Schema schema = new Schema(asList(fields), metadata);
return schema;
}

public ByteString encode() throws IOException {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from PluginServer.java

Schema schema = toArrowSchema();
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
writer.start();
writer.end();
return ByteString.copyFrom(out.toByteArray());
}
}
}
}
}
1 change: 1 addition & 0 deletions lib/src/main/java/io/cloudquery/server/ServeCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private LoggerContext initLogger() {
context.start(configuration);

logger = context.getLogger(ServeCommand.class.getName());
this.plugin.setLogger(logger);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the logger on the plugin was missing

return context;
}

Expand Down