Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cloudquery/sdk/internal/memdb/memdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from cloudquery.sdk import schema
from typing import List, Generator, Dict
import pyarrow as pa
from cloudquery.sdk.schema.table import Table
from cloudquery.sdk.types import JSONType
from dataclasses import dataclass, field

Expand Down Expand Up @@ -109,5 +110,9 @@ def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
else:
raise NotImplementedError(f"Unknown message type {type(msg)}")

def read(self, table: Table) -> Generator[message.ReadMessage, None, None]:
for table, record in self._db.items():
yield message.ReadMessage(record)

def close(self) -> None:
self._db = {}
14 changes: 11 additions & 3 deletions cloudquery/sdk/internal/servers/plugin_v3/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,14 @@ def Sync(self, request, context):
# unknown sync message type
raise NotImplementedError()

def Read(self, request, context):
raise NotImplementedError()
def Read(
self, request: plugin_pb2.Read.Request, context
) -> Generator[plugin_pb2.Read.Response, None, None]:
schema = arrow.new_schema_from_bytes(request.table)
table = Table.from_arrow_schema(schema)
for msg in self._plugin.read(table):
buf = arrow.record_to_bytes(msg.record)
yield plugin_pb2.Read.Response(record=buf)

def Write(
self, request_iterator: Generator[plugin_pb2.Write.Request, None, None], context
Expand All @@ -93,7 +99,9 @@ def msg_iterator() -> Generator[WriteMessage, None, None]:
if field == "migrate_table":
sc = arrow.new_schema_from_bytes(msg.migrate_table.table)
table = Table.from_arrow_schema(sc)
yield WriteMigrateTableMessage(table=table)
yield WriteMigrateTableMessage(
table=table, migrate_force=msg.migrate_table.migrate_force
)
elif field == "insert":
yield WriteInsertMessage(
record=arrow.new_record_from_bytes(msg.insert.record)
Expand Down
1 change: 1 addition & 0 deletions cloudquery/sdk/message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
WriteMigrateTableMessage,
WriteDeleteStale,
)
from .read import ReadMessage
6 changes: 6 additions & 0 deletions cloudquery/sdk/message/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pyarrow as pa


class ReadMessage:
def __init__(self, record: pa.RecordBatch):
self.record = record
3 changes: 2 additions & 1 deletion cloudquery/sdk/message/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ def __init__(self, record: pa.RecordBatch):


class WriteMigrateTableMessage(WriteMessage):
def __init__(self, table: Table):
def __init__(self, table: Table, migrate_force: bool):
self.table = table
self.migrate_force = migrate_force


class WriteDeleteStale(WriteMessage):
Expand Down
3 changes: 3 additions & 0 deletions cloudquery/sdk/plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,8 @@ def sync(self, options: SyncOptions) -> Generator[message.SyncMessage, None, Non
def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
raise NotImplementedError()

def read(self, table: Table) -> Generator[message.ReadMessage, None, None]:
raise NotImplementedError()

def close(self) -> None:
raise NotImplementedError()
13 changes: 12 additions & 1 deletion cloudquery/sdk/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
from .column import Column
from .table import Table, tables_to_arrow_schemas, filter_dfs
from .table import (
Table,
tables_to_arrow_schemas,
filter_dfs,
TableColumnChangeType,
TableColumnChange,
TableColumnChangeType,
get_table_changes,
get_table_column,
flatten_tables_recursive,
flatten_tables,
)
from .resource import Resource

# from .table_resolver import TableReso
4 changes: 3 additions & 1 deletion cloudquery/sdk/schema/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def to_arrow_field(self):
arrow.METADATA_TRUE if self.incremental_key else arrow.METADATA_FALSE
),
}
return pa.field(self.name, self.type, metadata=metadata)
return pa.field(
self.name, self.type, metadata=metadata, nullable=not self.not_null
)

@staticmethod
def from_arrow_field(field: pa.Field) -> Column:
Expand Down
145 changes: 139 additions & 6 deletions cloudquery/sdk/schema/table.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from __future__ import annotations

import copy
from enum import IntEnum
import fnmatch
from typing import List
from typing import List, Optional

import pyarrow as pa

from cloudquery.sdk.schema import arrow
from .column import Column


CQ_SYNC_TIME_COLUMN = "cq_sync_time"
CQ_SOURCE_NAME_COLUMN = "cq_source_name"


class Client:
pass

Expand Down Expand Up @@ -192,9 +197,137 @@ def filter_dfs_child(r, matched, include, exclude, skip_dependent_tables):
return None


def flatten_tables(tables: List[Table]) -> List[Table]:
flattened: List[Table] = []
class TableColumnChangeType:
ADD = 1
REMOVE = 2
REMOVE_UNIQUE_CONSTRAINT = 3


class TableColumnChange:
def __init__(
self,
type: TableColumnChangeType,
column_name: str,
current: Optional[Column],
previous: Optional[Column],
):
self.type = type
self.column_name = column_name
self.current = current
self.previous = previous


class TableColumnChangeType(IntEnum):
UNKNOWN = 0
ADD = 1
UPDATE = 2
REMOVE = 3
REMOVE_UNIQUE_CONSTRAINT = 4
MOVE_TO_CQ_ONLY = 5


def get_table_changes(new: Table, old: Table) -> List[TableColumnChange]:
changes = []

# Special case: Moving from individual PKs to singular PK on _cq_id
new_pks = new.primary_keys
if (
len(new_pks) == 1
and new_pks[0] == "CqIDColumn"
and get_table_column(old, "CqIDColumn") is None
and len(old.primary_keys) > 0
):
changes.append(
TableColumnChange(
type=TableColumnChangeType.MOVE_TO_CQ_ONLY,
)
)

for c in new.columns:
other_column = get_table_column(old, c.name)
# A column was added to the table definition
if other_column is None:
changes.append(
TableColumnChange(
type=TableColumnChangeType.ADD,
column_name=c.name,
current=c,
previous=None,
)
)
continue

# Column type or options (e.g. PK, Not Null) changed in the new table definition
if (
c.type != other_column.type
or c.not_null != other_column.not_null
or c.primary_key != other_column.primary_key
):
changes.append(
TableColumnChange(
type=TableColumnChangeType.UPDATE,
column_name=c.name,
current=c,
previous=other_column,
)
)

# Unique constraint was removed
if not c.unique and other_column.unique:
changes.append(
TableColumnChange(
type=TableColumnChangeType.REMOVE_UNIQUE_CONSTRAINT,
column_name=c.name,
current=c,
previous=other_column,
)
)

# A column was removed from the table definition
for c in old.columns:
if get_table_column(new, c.name) is None:
changes.append(
TableColumnChange(
type=TableColumnChangeType.REMOVE,
column_name=c.name,
current=None,
previous=c,
)
)

return changes


def get_table_column(table: Table, column_name: str) -> Optional[Column]:
for c in table.columns:
if c.name == column_name:
return c
return None


def flatten_tables_recursive(original_tables: List[Table]) -> List[Table]:
tables = []
for table in original_tables:
table_copy = Table(
name=table.name,
columns=table.columns,
relations=table.relations,
title=table.title,
description=table.description,
is_incremental=table.is_incremental,
parent=table.parent,
)
tables.append(table_copy)
tables.extend(flatten_tables_recursive(table.relations))
return tables


def flatten_tables(original_tables: List[Table]) -> List[Table]:
tables = flatten_tables_recursive(original_tables)
seen = set()
deduped = []
for table in tables:
flattened.append(table)
flattened.extend(flatten_tables(table.relations))
return flattened
if table.name not in seen:
deduped.append(table)
seen.add(table.name)
return deduped
3 changes: 3 additions & 0 deletions cloudquery/sdk/types/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ def __arrow_ext_deserialize__(self, storage_type, serialized):
# return an instance of this subclass given the serialized
# metadata.
return JSONType()


pa.register_extension_type(JSONType())
3 changes: 3 additions & 0 deletions cloudquery/sdk/types/uuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ def __arrow_ext_deserialize__(self, storage_type, serialized):
# return an instance of this subclass given the serialized
# metadata.
return UUIDType()


pa.register_extension_type(UUIDType())
Loading