Skip to content

Commit 9e978de

Browse files
committed
Update to support electric 1.4
- Fix invalid protocol typing - Support more esoteric version tuples
1 parent aaeaef7 commit 9e978de

22 files changed

Lines changed: 351 additions & 188 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ electric_phoenix-*.tar
2424

2525
# Temporary files, for example, from tests.
2626
/tmp/
27+
.claude
28+

lib/mix/tasks/phoenix_sync.install.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ if Code.ensure_loaded?(Igniter) do
300300
defp required_electric_version do
301301
Phoenix.Sync.MixProject.project()
302302
|> Keyword.fetch!(:deps)
303-
|> Enum.find(&match?({:electric, _, _}, &1))
303+
|> Enum.find(&(elem(&1, 0) == :electric))
304304
|> elem(1)
305305
end
306306

lib/phoenix/sync/electric.ex

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -640,8 +640,9 @@ defmodule Phoenix.Sync.Electric do
640640
end
641641
end
642642

643-
if Code.ensure_loaded?(Electric.Shapes.Api) &&
644-
Code.ensure_loaded?(Phoenix.Sync.Electric.ApiAdapter) do
643+
if Code.ensure_loaded?(Electric.Shapes.Api) do
644+
Code.ensure_loaded(Phoenix.Sync.Electric.ApiAdapter)
645+
645646
defimpl Phoenix.Sync.Adapter.PlugApi, for: Electric.Shapes.Api do
646647
alias Electric.Shapes
647648

@@ -701,7 +702,7 @@ if Code.ensure_loaded?(Electric.Shapes.Api) &&
701702
end
702703
end
703704

704-
def send_response(%ApiAdapter{}, conn, {request, response}) do
705+
def send_response(_api, conn, {request, response}) do
705706
conn
706707
|> content_type()
707708
|> Plug.Conn.assign(:request, request)

lib/phoenix/sync/plug/cors.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ defmodule Phoenix.Sync.Plug.CORS do
1313
@electric_headers [
1414
"electric-cursor",
1515
"electric-handle",
16+
"electric-has-data",
1617
"electric-offset",
1718
"electric-schema",
1819
"electric-up-to-date",
19-
"electric-internal-known-error"
20+
"electric-internal-known-error",
21+
"retry-after"
2022
]
2123

2224
@expose_headers ["transfer-encoding" | @electric_headers]

lib/phoenix/sync/sandbox.ex

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,14 @@ if Phoenix.Sync.sandbox_enabled?() do
253253

254254
:ok = maybe_set_shared_mode(owner, stack_id, opts)
255255

256-
# give the inspector access to the sandboxed connection
257-
Ecto.Adapters.SQL.Sandbox.allow(repo, owner, Sandbox.Inspector.name(stack_id))
258-
259256
# mark the stack as ready
260257
Electric.StatusMonitor.mark_pg_lock_acquired(stack_id, owner)
261258
Electric.StatusMonitor.mark_replication_client_ready(stack_id, owner)
262-
Electric.StatusMonitor.mark_connection_pool_ready(stack_id, owner)
259+
Electric.StatusMonitor.mark_connection_pool_ready(stack_id, :admin, owner)
260+
Electric.StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, owner)
261+
Electric.StatusMonitor.mark_integrety_checks_passed(stack_id, owner)
262+
Electric.StatusMonitor.mark_shape_log_collector_ready(stack_id, owner)
263+
Electric.StatusMonitor.mark_supervisor_processes_ready(stack_id, owner)
263264

264265
api_config = Sandbox.Stack.config(stack_id, repo)
265266
api = Electric.Application.api(api_config)
@@ -292,7 +293,8 @@ if Phoenix.Sync.sandbox_enabled?() do
292293
defp generate_stack_id(opts) do
293294
tags = Keyword.get(opts, :tags, %{})
294295
# with parameterised tests the same file:line can be running simultaneously
295-
uid = System.unique_integer() |> to_string()
296+
uid = System.unique_integer([:positive, :monotonic]) |> to_string()
297+
now = System.monotonic_time(:microsecond)
296298

297299
suffix =
298300
case Map.fetch(tags, :line) do
@@ -306,7 +308,7 @@ if Phoenix.Sync.sandbox_enabled?() do
306308
:error -> ""
307309
end
308310

309-
"#{inspect(__MODULE__.Stack)}#{uid}#{prefix}#{suffix}"
311+
"#{inspect(__MODULE__.Stack)}#{now}-#{uid}#{prefix}#{suffix}"
310312
end
311313

312314
defp maybe_set_shared_mode(owner, stack_id, opts) do
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Phoenix.Sync.Sandbox.Cleanup do
2+
# We use PureFileStorage in the sandbox to remove compatibility issues
3+
# this ensures that the storage dir is removed when the stack is torn
4+
# down
5+
@moduledoc false
6+
7+
use GenServer
8+
9+
def start_link({stack_id, storage_dir}) do
10+
GenServer.start_link(__MODULE__, {stack_id, storage_dir})
11+
end
12+
13+
@impl GenServer
14+
def init({stack_id, storage_dir}) do
15+
Process.flag(:trap_exit, true)
16+
{:ok, Path.join(storage_dir, stack_id)}
17+
end
18+
19+
@impl GenServer
20+
def terminate(_reason, storage_dir) do
21+
File.rm_rf!(storage_dir)
22+
:ok
23+
end
24+
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
defmodule Phoenix.Sync.Sandbox.ExpiryManager do
2+
use GenServer
3+
4+
def start_link(args) do
5+
GenServer.start_link(__MODULE__, args, name: Electric.ShapeCache.ExpiryManager.name(args))
6+
end
7+
8+
def init(_) do
9+
{:ok, []}
10+
end
11+
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
defmodule Phoenix.Sync.Sandbox.InitializeStack do
2+
@moduledoc false
3+
4+
# synchronously initializes the stack
5+
6+
use GenServer
7+
8+
def start_link(args) do
9+
GenServer.start_link(__MODULE__, args)
10+
end
11+
12+
@impl GenServer
13+
def init(args) do
14+
{:ok, stack_id} = Keyword.fetch(args, :stack_id)
15+
16+
:ok = Electric.ShapeCache.ShapeStatusOwner.initialize(stack_id)
17+
18+
Electric.LsnTracker.set_last_processed_lsn(
19+
stack_id,
20+
Electric.Postgres.Lsn.from_integer(0)
21+
)
22+
23+
:ignore
24+
end
25+
end

lib/phoenix/sync/sandbox/inspector.ex

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ if Phoenix.Sync.sandbox_enabled?() do
3434
@impl Electric.Postgres.Inspector
3535
def list_relations_with_stale_cache(_), do: {:ok, []}
3636

37+
@impl Electric.Postgres.Inspector
38+
def load_supported_features(stack_id) do
39+
with {:ok, pid} <- validate_stack_alive(stack_id) do
40+
GenServer.call(pid, :load_supported_features)
41+
end
42+
end
43+
3744
def start_link(args) do
3845
GenServer.start_link(__MODULE__, args, name: name(args[:stack_id]))
3946
end
@@ -56,8 +63,20 @@ if Phoenix.Sync.sandbox_enabled?() do
5663
def init(args) do
5764
{:ok, stack_id} = Keyword.fetch(args, :stack_id)
5865
{:ok, repo} = Keyword.fetch(args, :repo)
59-
60-
{:ok, %{repo: repo, stack_id: stack_id, relations: %{}, columns: %{}, oids: %{}}}
66+
{:ok, owner} = Keyword.fetch(args, :owner)
67+
68+
# give the inspector access to the sandboxed connection
69+
Ecto.Adapters.SQL.Sandbox.allow(repo, owner, self())
70+
71+
{:ok,
72+
%{
73+
repo: repo,
74+
stack_id: stack_id,
75+
relations: %{},
76+
columns: %{},
77+
oids: %{},
78+
supported_features: %{}
79+
}}
6180
end
6281

6382
@impl GenServer
@@ -88,6 +107,15 @@ if Phoenix.Sync.sandbox_enabled?() do
88107
{:reply, result, state}
89108
end
90109

110+
def handle_call(:load_supported_features, _from, state) do
111+
{result, state} =
112+
fetch_lazy(state, :supported_features, nil, fn ->
113+
Electric.Postgres.Inspector.DirectInspector.load_supported_features(pool(state))
114+
end)
115+
116+
{:reply, result, state}
117+
end
118+
91119
defp pool(state) do
92120
%{pid: pool} = Ecto.Adapter.lookup_meta(state.repo.get_dynamic_repo())
93121
pool

lib/phoenix/sync/sandbox/producer.ex

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ if Phoenix.Sync.sandbox_enabled?() do
33
@moduledoc false
44

55
alias Electric.Replication.Changes.{
6-
Transaction,
6+
TransactionFragment,
7+
Commit,
78
NewRecord,
89
UpdatedRecord,
910
DeletedRecord,
@@ -50,6 +51,7 @@ if Phoenix.Sync.sandbox_enabled?() do
5051

5152
def init(stack_id) do
5253
state = %{txid: 10000, stack_id: stack_id}
54+
5355
{:ok, state}
5456
end
5557

@@ -62,7 +64,7 @@ if Phoenix.Sync.sandbox_enabled?() do
6264
:ok =
6365
txid
6466
|> transaction(msgs)
65-
|> ShapeLogCollector.store_transaction(ShapeLogCollector.name(stack_id))
67+
|> ShapeLogCollector.handle_event(stack_id)
6668

6769
{:noreply, %{state | txid: next_txid}}
6870
end
@@ -73,20 +75,22 @@ if Phoenix.Sync.sandbox_enabled?() do
7375
:ok =
7476
state.txid
7577
|> transaction(changes)
76-
|> ShapeLogCollector.store_transaction(ShapeLogCollector.name(state.stack_id))
78+
|> ShapeLogCollector.handle_event(state.stack_id)
7779

7880
{:noreply, %{state | txid: state.txid + 100}}
7981
end
8082

8183
defp transaction(txid, changes) do
82-
%Transaction{
84+
%{log_offset: last_log_offset} = Enum.at(changes, -1)
85+
86+
%TransactionFragment{
8387
xid: txid,
8488
lsn: Electric.Postgres.Lsn.from_integer(txid),
85-
last_log_offset: Enum.at(changes, -1) |> Map.fetch!(:log_offset),
89+
last_log_offset: last_log_offset,
90+
has_begin?: true,
91+
commit: %Commit{},
8692
changes: changes,
87-
num_changes: length(changes),
88-
commit_timestamp: DateTime.utc_now(),
89-
affected_relations: Enum.into(changes, MapSet.new(), & &1.relation)
93+
affected_relations: MapSet.new(changes, & &1.relation)
9094
}
9195
end
9296

0 commit comments

Comments
 (0)