Skip to content

Commit 38fa31d

Browse files
committed
Add confirmed publishing and cluster support.
1 parent 2e9e00b commit 38fa31d

20 files changed

Lines changed: 149 additions & 52 deletions

.github/workflows/downstream.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ name: Downstream
22

33
on:
44
workflow_dispatch:
5-
branches:
6-
- trunk
75
push:
86
branches:
97
- trunk

.github/workflows/tests.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ jobs:
99
- uses: actions/checkout@v1
1010
- uses: ruby/setup-ruby@v1
1111
with:
12-
ruby-version: 2.6.3
12+
ruby-version: 2.6.9
1313
- name: Cache Gems
14-
uses: actions/cache@v1
14+
uses: actions/cache@v4
1515
with:
1616
path: vendor/bundle
17-
key: ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
17+
key: ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
1818
restore-keys: |
19-
${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
19+
${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
2020
- name: Install Gems
2121
run: |
2222
sudo gem install bundler -v '1.17.3'

.ruby-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.6.6
1+
2.6.9

Gemfile.lock

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ GEM
5555
timers (~> 4.0.0)
5656
coderay (1.1.0)
5757
concurrent-ruby (1.1.6)
58-
crass (1.0.4)
58+
crass (1.0.6)
5959
diff-lcs (1.3)
6060
erubis (2.7.0)
6161
ffi (1.10.0)
@@ -85,20 +85,21 @@ GEM
8585
celluloid (~> 0.16.0)
8686
rb-fsevent (>= 0.9.3)
8787
rb-inotify (>= 0.9)
88-
loofah (2.2.2)
88+
loofah (2.25.0)
8989
crass (~> 1.0.2)
90-
nokogiri (>= 1.5.9)
90+
nokogiri (>= 1.12.0)
9191
lumberjack (1.0.9)
9292
mail (2.6.3)
9393
mime-types (>= 1.16, < 3)
9494
method_source (0.8.2)
9595
mime-types (2.99.3)
96-
mini_portile2 (2.4.0)
96+
mini_portile2 (2.8.9)
9797
minitest (5.14.0)
9898
multi_json (1.13.1)
9999
nenv (0.2.0)
100-
nokogiri (1.9.1)
101-
mini_portile2 (~> 2.4.0)
100+
nokogiri (1.13.10)
101+
mini_portile2 (~> 2.8.0)
102+
racc (~> 1.4)
102103
notiffany (0.0.6)
103104
nenv (~> 0.1)
104105
shellany (~> 0.0)
@@ -111,6 +112,7 @@ GEM
111112
pry-remote (0.1.8)
112113
pry (~> 0.9)
113114
slop (~> 3.0)
115+
racc (1.8.1)
114116
rack (1.6.9)
115117
rack-test (0.6.3)
116118
rack (>= 1.0)

lib/acapi.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
require "active_support"
33

44
require "acapi/config"
5+
require "acapi/errors"
6+
57
require "acapi/notifiers"
68
require "acapi/publisher"
79
require "acapi/subscriber"

lib/acapi/amqp/client.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ def initialize(chan, q)
1212
@argument_errors = []
1313
@bad_argument_queue = "acapi.error.middleware.service.bad_arguments"
1414
@processing_failed_queue = "acapi.error.middleware.service.processing_failed"
15+
@republish_channel = @channel.connection.create_channel
16+
@republish_channel.confirm_select
17+
@republish_queue = @republish_channel.queue(@queue.name, @queue.options)
1518
@exit_after_work = false
1619
end
1720

@@ -102,7 +105,8 @@ def subscribe(opts = {})
102105
publish_processing_failed(delivery_info, properties, payload, e)
103106
else
104107
new_properties = redelivery_properties(existing_retry_count, delivery_info, properties)
105-
queue.publish(payload, new_properties)
108+
@republish_queue.publish(payload, new_properties)
109+
@republish_channel.wait_for_confirms
106110
channel.acknowledge(delivery_info.delivery_tag, false)
107111
end
108112
rescue => e

lib/acapi/amqp/messaging_exchange_topology.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ module Acapi
44
module Amqp
55
class MessagingExchangeTopology
66

7-
def self.ensure_topology_exists(connection_string)
8-
topology = new(connection_string)
7+
def self.ensure_topology_exists(connection_settings)
8+
topology = new(connection_settings)
99
topology.setup
1010
topology.close
1111
end
1212

13-
def initialize(connection_string)
14-
@connection = Bunny.new(connection_string, :heartbeat => 15)
13+
def initialize(connection_settings)
14+
@connection = Bunny.new(connection_settings)
1515
@connection.start
1616
@channel = @connection.create_channel
1717
end

lib/acapi/amqp/requestor.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ def initialize(conn)
1111
def request(properties, payload, timeout = 15)
1212
delivery_info, r_props, r_payload = [nil, nil, nil]
1313
channel = @connection.create_channel
14+
p_channel = @connection.create_channel
1415
temp_queue = channel.queue("", :exclusive => true)
1516
channel.prefetch(1)
16-
request_exchange = channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
17+
p_channel.confirm_select
18+
request_exchange = p_channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
1719
request_exchange.publish(payload, properties.dup.merge({ :reply_to => temp_queue.name, :persistent => true }))
20+
p_channel.wait_for_confirms
1821
delivery_info, r_props, r_payload = [nil, nil, nil]
1922
begin
2023
Timeout::timeout(timeout) do
@@ -26,6 +29,7 @@ def request(properties, payload, timeout = 15)
2629
end
2730
ensure
2831
temp_queue.delete
32+
p_channel.close
2933
channel.close
3034
end
3135
[delivery_info, r_props, r_payload]

lib/acapi/amqp/responder.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ module Amqp
33
module Responder
44
def with_response_exchange(connection)
55
channel = connection.create_channel
6+
channel.confirm_select
67
publish_exchange = channel.default_exchange
78
yield publish_exchange
9+
channel.wait_for_confirms
810
channel.close
911
end
1012
end

lib/acapi/amqp_event_worker.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ def self.run
6868
pid_file_location = File.join(File.expand_path(Rails.root), "pids", "sneakers.pid")
6969
worker_classes = Rails.application.config.acapi.sneakers_worker_classes
7070
ensure_messaging_exchanges
71+
connection = Bunny.new(Rails.application.config.acapi.to_connection_settings)
7172
Sneakers.configure(
7273
:workers => worker_classes.length,
73-
:amqp => Rails.application.config.acapi.remote_broker_uri,
74+
:connection => connection,
7475
:start_worker_delay => 0.2,
75-
:heartbeat => 5,
7676
:log => STDOUT,
7777
:pid_path => pid_file_location,
7878
:handler => Sneakers::Handlers::Maxretry,
@@ -88,7 +88,7 @@ def self.run
8888
end
8989

9090
def self.ensure_messaging_exchanges
91-
::Acapi::Amqp::MessagingExchangeTopology.ensure_topology_exists(Rails.application.config.acapi.remote_broker_uri)
91+
::Acapi::Amqp::MessagingExchangeTopology.ensure_topology_exists(Rails.application.config.acapi.to_connection_settings)
9292
end
9393
end
9494
end

0 commit comments

Comments
 (0)