diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 83e91701..14b54ccb 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -86,35 +86,45 @@ def connect_thread(latch) def connect_stream(latch) return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch) while connected? || @first_event.value - begin - partial_data = "" - Timeout::timeout @read_timeout do + if IO.select([@socket], nil, nil, @read_timeout) + begin partial_data = @socket.readpartial(10_000) - end - read_first_event(partial_data, latch) - - raise 'eof exception' if partial_data == :eof - rescue Timeout::Error => e - log_if_debug("SSE read operation timed out!: #{e.inspect}", 3) - return Constants::PUSH_RETRYABLE_ERROR - rescue EOFError - raise 'eof exception' - rescue Errno::EAGAIN => e - log_if_debug("SSE client transient error: #{e.inspect}", 1) - IO.select([tcp_socket]) - retry - rescue Errno::EBADF, IOError => e - log_if_debug(e.inspect, 3) - return nil - rescue StandardError => e - return nil if ENV['SPLITCLIENT_ENV'] == 'test' + read_first_event(partial_data, latch) - log_if_debug("Error reading partial data: #{e.inspect}", 3) + raise 'eof exception' if partial_data == :eof + rescue IO::WaitReadable => e + log_if_debug("SSE client transient error: #{e.inspect}", 1) + IO.select([@socket], nil, nil, @read_timeout) + retry + rescue Errno::ETIMEDOUT => e + log_if_debug("SSE read operation timed out!: #{e.inspect}", 3) + return Constants::PUSH_RETRYABLE_ERROR + rescue EOFError => e + log_if_debug("SSE read operation EOF Exception!: #{e.inspect}", 3) + raise 'eof exception' + rescue Errno::EAGAIN => e + log_if_debug("SSE client transient error: #{e.inspect}", 1) + IO.select([@socket], nil, nil, @read_timeout) + retry + rescue Errno::EBADF, IOError => e + log_if_debug("SSE read operation EBADF or IOError: #{e.inspect}", 3) + return nil + rescue StandardError => e + log_if_debug("SSE read operation StandardError: #{e.inspect}", 3) + return nil if ENV['SPLITCLIENT_ENV'] == 'test' + + log_if_debug("Error reading partial data: #{e.inspect}", 3) + return Constants::PUSH_RETRYABLE_ERROR + end + else + @config.logger.debug("SSE read operation timed out, no data available.") return Constants::PUSH_RETRYABLE_ERROR end process_data(partial_data) end + log_if_debug("SSE read operation exited: #{connected?}", 1) + nil end @@ -142,6 +152,7 @@ def read_first_event(data, latch) if response_code == OK_CODE && !error_event @connected.make_true + @config.logger.debug("SSE client first event Connected is true") @telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil) push_status(Constants::PUSH_CONNECTED) end @@ -166,9 +177,8 @@ def socket_connect IO.select(nil, [ssl_socket]) retry end - return ssl_socket -# return ssl_socket.connect + rescue Exception => e @config.logger.error("socket connect error: #{e.inspect}") return nil @@ -179,9 +189,9 @@ def socket_connect end def process_data(partial_data) + log_if_debug("Event partial data: #{partial_data}", 1) return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE - log_if_debug("Event partial data: #{partial_data}", 1) events = @event_parser.parse(partial_data) events.each { |event| process_event(event) } rescue StandardError => e diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index 046f9d21..015056f0 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '8.10.0-rc2' + VERSION = '8.10.0-rc5' end diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 1c199b09..213cc184 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -221,6 +221,36 @@ end end + it 'client timeout and reconnect' do + stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=-1&rbSince=-1') + .with(headers: { 'Authorization' => 'Bearer client-spec-key' }) + .to_return(status: 200, body: '{"ff":{"d":[],"s":-1,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}') + stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=5564531221&rbSince=-1') + .with(headers: { 'Authorization' => 'Bearer client-spec-key' }) + .to_return(status: 200, body: '{"ff":{"d":[],"s":5564531221,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}') + + mock_server do |server| + start_workers + server.setup_response('/') do |_, res| + send_stream_content(res, event_split_update) + end + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue, read_timeout: 0.1) + connected = sse_client.start(server.base_uri) + sleep 1 + expect(connected).to eq(true) + expect(sse_client.connected?).to eq(true) + expect(push_status_queue.pop(true)).to eq(SplitIoClient::Constants::PUSH_CONNECTED) + sleep 3 + expect(log.string).to include 'SSE read operation timed out, no data available' + expect(sse_client.connected?).to eq(true) + sse_client.close + expect(sse_client.connected?).to eq(false) + + stop_workers + end + end + it 'first event - when server return 400' do mock_server do |server| server.setup_response('/') do |_, res|