Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
81439c6
Sketch out a client side muxer with a wildcard subscription
film42 Jan 5, 2023
ee14969
Fix up a few things with the new ruby pure client
film42 Jan 5, 2023
f85d4dc
Fix up tests.. mocks are not great
film42 Jan 5, 2023
bec0a79
Add a working global response muxer
film42 Jan 5, 2023
73f0fa0
Reset the muxer between each test (as the client changes)
film42 Jan 5, 2023
fbec836
Add error handler when client response muxer handler fails
film42 Jan 5, 2023
010028d
Fork jruby/mri code as needed and remove old mri client impl
film42 Jan 5, 2023
349744b
Fix bugs + move platform to specific file
film42 Jan 5, 2023
3e7ced5
Add new flag to readme
film42 Jan 5, 2023
4684de8
Remove dead code
film42 Jan 5, 2023
dafddfc
Add a pre-release to the branch
film42 Jan 6, 2023
1f2e9a0
Bump to an unused pre-release version
film42 Jan 6, 2023
a976595
muxer, and ruby version upgrade, also added circleci
skunkworker Jun 1, 2026
91d7ab4
merge master
skunkworker Jun 1, 2026
3eb9af4
ripped out jnats
skunkworker Jun 1, 2026
001bac9
Added better tuned jruby_opts for testing.
skunkworker Jun 1, 2026
2515c76
more work
skunkworker Jun 2, 2026
05768be
added notes
skunkworker Jun 2, 2026
5f3b97d
Cleaning up more
skunkworker Jun 2, 2026
ff9f6fc
more cleanup work
skunkworker Jun 2, 2026
d57aa97
comment out code
skunkworker Jun 2, 2026
1405ecc
fix specs
skunkworker Jun 2, 2026
b65544d
more
skunkworker Jun 2, 2026
a40cc13
more
skunkworker Jun 2, 2026
3970af6
more
skunkworker Jun 2, 2026
02c8464
more
skunkworker Jun 2, 2026
cb8973c
added changelog.md
skunkworker Jun 2, 2026
cb9a55e
more
skunkworker Jun 5, 2026
a9a22b4
add logging when we see an unexpected message
skunkworker Jun 5, 2026
15247b5
Found a few concurrency bugs.
skunkworker Jun 8, 2026
96a1e53
cleaned up some more logs.
skunkworker Jun 8, 2026
3366cd7
more cleanup.
skunkworker Jun 8, 2026
5192161
more
skunkworker Jun 8, 2026
ffbeb6a
more defensive programming.
skunkworker Jun 8, 2026
d2b1a00
more
skunkworker Jun 8, 2026
e940eae
more
skunkworker Jun 8, 2026
8090fd6
more
skunkworker Jun 8, 2026
d64e8f2
more cleanup
skunkworker Jun 8, 2026
cec049d
removed thread count
skunkworker Jun 8, 2026
32ed76f
more
skunkworker Jun 8, 2026
6e54822
more
skunkworker Jun 8, 2026
b7cfb72
Use 0.13.0 instead of 0.12.0 for this since it's a large overhaul.
skunkworker Jun 8, 2026
62615e1
more work on specs.
skunkworker Jun 8, 2026
3c7462b
more cleanup and spec fixing
skunkworker Jun 8, 2026
370702b
more work
skunkworker Jun 8, 2026
3e64d7a
more error handling in server.rb
skunkworker Jun 8, 2026
1ae84f4
more work on cleanup
skunkworker Jun 8, 2026
5553728
more cleanup
skunkworker Jun 8, 2026
4b4fe51
better thread naming
skunkworker Jun 8, 2026
689c3e4
fixed double synch
skunkworker Jun 8, 2026
c3ddd80
more
skunkworker Jun 8, 2026
f12bead
moved into individual files
skunkworker Jun 8, 2026
6e7ec89
more
skunkworker Jun 8, 2026
5f49269
Add super subscription manager spec
skunkworker Jun 8, 2026
ccb16c1
fixed loading and broke out code
skunkworker Jun 8, 2026
fc549df
fix some specs
skunkworker Jun 8, 2026
b2e816b
use UUIDv7, also add more robust locking
skunkworker Jun 9, 2026
6e42de9
more bulletproofing
skunkworker Jun 9, 2026
d36f723
sped up tests
skunkworker Jun 9, 2026
139090c
more work on periodic cleanup
skunkworker Jun 9, 2026
b9ad11b
Cleanup restart bug
skunkworker Jun 9, 2026
7ff421f
fixed performance regression
skunkworker Jun 9, 2026
bf40121
added improvements
skunkworker Jun 9, 2026
83ce93d
more uuidv7 reporting
skunkworker Jun 9, 2026
0b73ebc
move to concurrent ruby, add in some easy performance gains
skunkworker Jun 9, 2026
fab43d6
removed summary
skunkworker Jun 9, 2026
0876a97
fix specs for cruby
skunkworker Jun 10, 2026
0d7fc92
add missing dep
skunkworker Jun 10, 2026
3c7394a
enforce stricter concurrent-ruby
skunkworker Jun 10, 2026
4aaf55c
pin concurrent-ruby to 1.3.6
skunkworker Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ jobs:
environment:
JRUBY_OPTS: "-J-Xmx1024m"
RAILS_ENV: test
NATS_URL: "changeme"

# 2. The Service Container (runs in the background)
- image: nats:2.14.1-linux
- image: nats:2.14-linux

working_directory: ~/project

Expand Down Expand Up @@ -82,4 +81,4 @@ workflows:
- "cimg/ruby:3.1"
- "cimg/ruby:3.4"
- "jruby:9.4"
- "jruby:10.0"
- "jruby:10.0"
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Changelog

### 0.13.0 - WIP
- Removed JNats (`nats-pure` is fast enough for JRuby and CRuby parallel work)
- Added ResponseMuxer (similar to Golang)
- Added instrumentation when encountering unexpected messages.

9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,20 @@ And we can see the message was sent to the server and the server replied with a
If we were to add another service endpoint called `search` to the `UserService` but fail to define an instance method
`search`, then `protobuf-nats` will not subscribe to that route.

## Future Improvements (locked behind ruby version)
- Migrate to native `Random.new.uuid_v7`
```ruby
@prng_lock.synchronize { @prng.uuid_v7(extra_timestamp_bits: 12) }
```
- Change ResponseMuxer to use `.pop()` with a timeout.


## Development

After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake test` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run `bundle exec rake install`. To release a new version, update the version number in `version.rb`, and then run `bundle exec rake release`, which will create a git tag for the version, push git commits and tags, and push the `.gem` file to [rubygems.org](https://rubygems.org).

The java-nats client is temporarily forked to support jruby > 9.2.10.0. The living branch for that is here: https://github.com/film42/java-nats/tree/jruby-compat. This will be removed when we upgrade to the new nats.java client.

## Contributing

Expand Down
16 changes: 16 additions & 0 deletions bench/bench.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

Notes:
`-Xjit.threshold=0` - Setting the threshold to 0 forces JRuby to compile every method into Java bytecode immediately before its very first execution. This is particularly useful for debugging or bypassing warm-up times during profiling


`-Xjit.threshold=10 -J-XX:CompileThreshold=10` - If you are running benchmarks and want both JRuby and the JVM to aggressively optimize early, you can lower both thresholds simultaneously

`bundle; bx ruby -I lib bench/real_client.rb`

Start local nats server so details can be monitored.
`/opt/homebrew/opt/nats-server/bin/nats-server -DV -m 8222 -p 4222`


```
export JRUBY_OPTS="--disable:did_you_mean -J-Djava.security.egd=file:/dev/./urandom -J-Xmx2g -J-Xms1024m -J-Xmn512m -Xjit.threshold=10 -J-XX:CompileThreshold=10"
```
17 changes: 17 additions & 0 deletions bench/console.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env ruby

require "bundler/setup"
require "protobuf/nats"

# You can add fixtures and/or initialization code here to make experimenting
# with your gem easier. You can also use a different console, if you like.

# (If you use this, don't forget to add pry to your Gemfile!)
# require "pry"
# Pry.start

# ENV["PB_CLIENT_TYPE"] = "protobuf/nats/client"
# ENV["PB_SERVER_TYPE"] = "protobuf/nats/runner"

require "irb"
IRB.start(__FILE__)
4 changes: 2 additions & 2 deletions bench/real_client.rb
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
Protobuf::Logging.logger = ::Logger.new(nil)

Benchmark.ips do |config|
config.warmup = 10
config.time = 10
config.warmup = 30
config.time = 30

config.report("single threaded performance") do
req = Warehouse::Shipment.new(:guid => SecureRandom.uuid)
Expand Down
13 changes: 13 additions & 0 deletions bench/real_client.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

export JRUBY_OPTS="-J-server --disable:did_you_mean -Xcompile.invokedynamic=true -Xjit.threshold=10 -J-Djava.security.egd=file:/dev/./urandom -J-Xms2g -J-Xmx2g -J-XX:+UseG1GC -J-XX:MaxGCPauseMillis=100"

# export JRUBY_OPTS="-J-server --disable:did_you_mean -Xcompile.invokedynamic=true -Xjit.threshold=0 -J-Djruby.jit.max=0 -J-Djruby.jit.background=false -J-Djava.security.egd=file:/dev/./urandom -J-Xms2g -J-Xmx2g -J-XX:+UseG1GC -J-XX:MaxGCPauseMillis=100"


export PB_SERVER_TYPE="protobuf/nats/runner"
export PB_CLIENT_TYPE="protobuf/nats/client"

echo "$PWD"

bundle exec ruby -I lib bench/real_client.rb
19 changes: 19 additions & 0 deletions bench/real_client_threaded.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ENV["PB_CLIENT_TYPE"] = "protobuf/nats/client"
ENV["PB_SERVER_TYPE"] = "protobuf/nats/runner"

require "./examples/warehouse/app"

THREAD_COUNT = ENV.fetch("CLIENT_THREADS",4).to_i

puts "THREAD_COUNT = #{THREAD_COUNT}"

::Protobuf::Logging.logger = ::Logger.new(nil)

while true
THREAD_COUNT.times.map do |i|
Thread.new do
req = Warehouse::Shipment.new(:guid => SecureRandom.uuid, :sleep_time_ms => 5)
Warehouse::ShipmentService.client.create(req)
end
end.each(&:join)
end
10 changes: 10 additions & 0 deletions bench/real_client_threaded.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

export JRUBY_OPTS="-J-server --disable:did_you_mean -Xcompile.invokedynamic=true -Xjit.threshold=10 -J-Djava.security.egd=file:/dev/./urandom -J-Xms2g -J-Xmx2g -J-XX:+UseG1GC -J-XX:MaxGCPauseMillis=100"

export PB_SERVER_TYPE="protobuf/nats/runner"
export PB_CLIENT_TYPE="protobuf/nats/client"

export THREAD_COUNT=4

bundle exec ruby -I lib bench/real_client_threaded.rb
17 changes: 16 additions & 1 deletion bench/real_server.sh
Original file line number Diff line number Diff line change
@@ -1 +1,16 @@
PB_SERVER_TYPE="protobuf/nats/runner" PB_CLIENT_TYPE="protobuf/nats/client" bundle exec rpc_server start --threads=20 ./examples/warehouse/app.rb > /dev/null
#!/bin/bash

# export JRUBY_OPTS="-J-server --disable:did_you_mean -Xcompile.invokedynamic=true -Xjit.threshold=0 -J-Djruby.jit.max=0 -J-Djruby.jit.background=false -J-Djava.security.egd=file:/dev/./urandom -J-Xms2g -J-Xmx2g -J-XX:+UseG1GC -J-XX:MaxGCPauseMillis=100"

export JRUBY_OPTS="-J-server -J-Xms4g -J-Xmx4g -J-XX:+AlwaysPreTouch -J-XX:+UseParallelGC -J-XX:ReservedCodeCacheSize=768m -J-XX:MaxInlineLevel=18 -J-XX:MaxInlineSize=100 -J-XX:FreqInlineSize=500 -J-XX:LoopUnrollLimit=250 -J-XX:+UseSuperWord -J-Djruby.jit.threshold=0 -J-Djruby.jit.max=0 -J-Djruby.jit.background=false -Xcompile.invokedynamic=true"


export PB_SERVER_TYPE="protobuf/nats/runner"
export PB_CLIENT_TYPE="protobuf/nats/client"

export PB_NATS_SERVER_SLOW_START_DELAY=1

export PB_NATS_SERVER_MAX_QUEUE_SIZE=6

bundle exec rpc_server start --threads=10 ./examples/warehouse/app.rb

10 changes: 8 additions & 2 deletions examples/warehouse/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class Shipment < ::Protobuf::Message; end
class ShipmentRequest < ::Protobuf::Message; end
class Shipments < ::Protobuf::Message; end


##
# Message Fields
#
Expand All @@ -21,6 +20,8 @@ class Shipment
optional :string, :address, 2
optional :double, :price, 3
optional :string, :package_guid, 4

optional :int64, :sleep_time_ms, 100
end

class ShipmentRequest
Expand All @@ -33,7 +34,6 @@ class Shipments
repeated ::Warehouse::Shipment, :records, 1
end


##
# Service Classes
#
Expand All @@ -43,6 +43,12 @@ class ShipmentService < ::Protobuf::Rpc::Service
rpc :search, ::Warehouse::ShipmentRequest, ::Warehouse::Shipments

def create
# Allows for easier testing of multiple threads
if request.sleep_time_ms > 0
sleep(request.sleep_time_ms / 1000.0)
puts "sleep_time:#{request.sleep_time_ms}"
end

respond_with request
end

Expand Down
20 changes: 10 additions & 10 deletions lib/protobuf/nats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@

require "nats/io/client"

require "protobuf/nats/errors"


require "protobuf/nats/client"
require "protobuf/nats/server"
require "protobuf/nats/runner"
require "protobuf/nats/config"
require "protobuf/nats/errors"
require "protobuf/nats/runner"
require "protobuf/nats/server"

require "protobuf/nats/response_muxer"
require "protobuf/nats/response_muxer_request"
require "protobuf/nats/super_subscription_manager"

module Protobuf
module Nats
Expand All @@ -23,12 +29,7 @@ module Messages
NACK = "\2".freeze
end

NatsClient = if defined? JRUBY_VERSION
require "protobuf/nats/jnats"
::Protobuf::Nats::JNats
else
::NATS::IO::Client
end
NatsClient = ::NATS::IO::Client

GET_CONNECTED_MUTEX = ::Mutex.new

Expand Down Expand Up @@ -114,7 +115,6 @@ def self.start_client_nats_connection
end
end

# This will work with both ruby and java errors
def self.log_error(error)
logger.error error.to_s
logger.error error.class.to_s
Expand Down
Loading