-
Notifications
You must be signed in to change notification settings - Fork 256
✨ add timeout option to batched event handler #641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
lib/commanded/event/handler.ex
Outdated
| if batch_timeout_provided? do | ||
| raise ArgumentError, | ||
| inspect(module) <> | ||
| " :batch_timeout requires :batch_size. Remove the timeout or configure batching." | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check makes more sense with the checks up above like "both :concurrency and :batch_size are specified..." at the beginning of this function.
A bonus of moving the checks is that I think you would no longer need the logic surrounding __no_batch_timeout__.
lib/commanded/event/handler.ex
Outdated
| # No batch_size configured - process immediately | ||
| is_nil(batch_size) -> | ||
| handle_batch(events, %{flush_reason: :immediate}, state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this case actually possible? If we are in :batch callback mode doesn't there have to be a batch_size configured upstream from here? Enforced by this code: https://github.com/commanded/commanded/blob/master/lib/commanded/event/handler.ex#L782-L784
If this case is not possible, and there are actually only two possible cases here, I think this code would be a little easier to follow if you moved things up into the handle_info above and had all three code paths directly in the case statement:
try do
events = Upcast.upcast_event_stream(events, additional_metadata: %{application: application})
state =
case {callback, state.batch_timeout) do
{:event, _} ->
# Non-batched: process immediately
Enum.reduce(events, state, &handle_event/2)
# Batched with no timeout configured
{:batch, nil} ->
handle_batch(events, state)
# Batched with timeout
{:batch, batch_timeout} ->
buffer_and_maybe_flush(events, state)
end
end
# ...
defp buffer_and_maybe_flush(events, %Handler{} = state) do
%Handler{
batch_buffer: buffer,
batch_size: batch_size,
batch_timeout: batch_timeout
} = state
current_buffer = buffer || []
new_buffer = current_buffer ++ events
state = %Handler{state | batch_buffer: new_buffer}
# Start timer if this is first event in batch
state = maybe_start_batch_timer(state)
# Check if we should flush based on size
if length(new_buffer) >= batch_size do
state
|> cancel_batch_timer()
|> flush_batch_buffer(:size)
else
state
end
endI think this makes more sense because the buffer_and_maybe_flush function now is clearly only on the codepath with the timer. It was a little confusing about why all :batch paths would go through the buffer + flush mechanism.
lib/commanded/event/handler.ex
Outdated
| } = state | ||
|
|
||
| # Upcast events before buffering | ||
| events = Upcast.upcast_event_stream(events, additional_metadata: %{application: application}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is being called here, and in the :event codepath of handle_info({:events, events}, state) here then I think its simpler for it to reside only in the handle_info({:events, events}, state) function (at the top of the try block).
- Move "batch_timeout requires batch_size" validation earlier - Remove __no_batch_timeout__ sentinel pattern - Restructure handle_info with clear three-way code paths - Move upcasting to single location before branching - Simplify buffer_and_maybe_flush (remove dead code paths) - Fix unused default argument warning in handle_batch/3
1cadc59 to
66ab766
Compare
drteeth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the idea of this change. Thanks for putting the time in. There are a few things I'm unclear about still, but we'll get there.
| {batch_timeout, config} = Keyword.pop(config, :batch_timeout, :infinity) | ||
|
|
||
| # Validate batch_size | ||
| unless is_nil(batch_size) or (is_integer(batch_size) and batch_size > 0) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless was deprecated in Elixir 1.18, can you refactor this to use if please?
| end | ||
|
|
||
| # Validate batch_timeout | ||
| unless batch_timeout == :infinity or (is_integer(batch_timeout) and batch_timeout > 0) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless was deprecated in Elixir 1.18, can you refactor this to use if please?
|
|
||
| # Clear buffer and timer BEFORE processing to prevent race condition | ||
| # If timer fires during batch processing, it will see empty buffer | ||
| state = %Handler{state | batch_buffer: [], batch_timer_ref: nil} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting batch_timer_ref to nil here can't prevent a race condition can it?. You are cancelling the timer, draining any flush messages that made it through, and setting the timer ref to nil in cancel_batch_timer/1 before this call.
| {:batch, _timeout} -> | ||
| # Batched with timeout: buffer events and flush on size or timeout | ||
| buffer_and_maybe_flush(events, state) | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love this.
|
|
||
| defp drain_flush_batch_timeout_message do | ||
| receive do | ||
| :flush_batch_timeout -> :ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the next message isn't a flush messge? I think we'll crash here no? And if you do manage to change the pattern match, then we've taken the message out of the mailbox and we have to process it it, whatever it is?
Am I wrong?
Adds a
batch_timeoutoption to event handlers.