diff --git a/.rubocop.yml b/.rubocop.yml index 43d65d5..0d93de4 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,4 +1,4 @@ -require: +plugins: - rubocop-performance AllCops: @@ -289,7 +289,7 @@ Style/PerlBackrefs: StyleGuide: "https://github.com/bbatsov/ruby-style-guide#no-perl-regexp-last-matchers" Enabled: false -Naming/PredicateName: +Naming/PredicatePrefix: Description: "Check the names of predicate methods." StyleGuide: "https://github.com/bbatsov/ruby-style-guide#bool-methods-qmark" ForbiddenPrefixes: diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index f398b16..f226f32 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -164,6 +164,7 @@ def initialize(uri, @on = { event: ->(_) {}, error: ->(_) {} } @last_id = last_event_id + @query_params_callback = nil yield self if block_given? @@ -206,6 +207,36 @@ def on_error(&action) @on[:error] = action end + # + # Specifies a block or Proc to generate query parameters dynamically. This will be called before + # each connection attempt (both initial connection and reconnections), allowing you to update + # query parameters based on the current client state. + # + # The block should return a Hash with string keys and string values, which will be merged with + # any existing query parameters in the base URI. If the callback raises an exception, it will be + # logged and the connection will proceed with the base URI's query parameters (or no query + # parameters if none were present). + # + # This is useful for scenarios where query parameters need to reflect the current state of the + # client, such as sending a "basis" parameter that represents what data the client already has. + # + # @example Using dynamic query parameters + # client = SSE::Client.new(base_uri) do |c| + # c.query_params do + # { + # "basis" => (selector.state if selector.defined?), + # "filter" => filter_key + # }.compact + # end + # c.on_event { |event| handle_event(event) } + # end + # + # @yieldreturn [Hash] a hash of query parameter names to values + # + def query_params(&action) + @query_params_callback = action + end + # # Permanently shuts down the client and its connection. No further events will be dispatched. This # has no effect if called a second time. @@ -289,8 +320,9 @@ def connect end cxn = nil begin - @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request(@method, @uri, build_opts) + uri = build_uri_with_query_params + @logger.info { "Connecting to event stream at #{uri}" } + cxn = @http_client.request(@method, uri, build_opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -397,5 +429,27 @@ def build_opts {headers: build_headers, body: resolved_payload.to_s} end end + + def build_uri_with_query_params + uri = @uri.dup + + if @query_params_callback + begin + dynamic_params = @query_params_callback.call + if dynamic_params.is_a?(Hash) && !dynamic_params.empty? + existing_params = uri.query ? URI.decode_www_form(uri.query).to_h : {} + merged_params = existing_params.merge(dynamic_params) + uri.query = URI.encode_www_form(merged_params) + elsif !dynamic_params.is_a?(Hash) + @logger.warn { "query_params callback returned non-Hash value: #{dynamic_params.class}, ignoring" } + end + rescue StandardError => e + @logger.warn { "query_params callback raised an exception: #{e.inspect}, proceeding with base URI" } + @logger.debug { "Exception trace: #{e.backtrace}" } + end + end + + uri + end end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 413a1f7..43eaa32 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -993,4 +993,308 @@ def test_object.to_s end end end + + describe "dynamic query parameters" do + it "sends query parameters from callback on initial connection" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + {"basis" => "p:ABC:123", "filter" => "test"} + end + end) do |client| + received_req = requests.pop + expect(received_req[:query_string]).to include("basis=p%3AABC%3A123") + expect(received_req[:query_string]).to include("filter=test") + end + end + end + + it "updates query parameters on reconnection" do + with_server do |server| + requests = Queue.new + attempt = 0 + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + attempt += 1 + if attempt == 1 + send_stream_content(res, "", keep_open: false) # Close to trigger reconnect + else + send_stream_content(res, "", keep_open: true) + end + end + + counter = 0 + with_client(subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c| + c.query_params do + counter += 1 + {"request_id" => counter.to_s} + end + end) do |client| + req1 = requests.pop + expect(req1[:query_string]).to eq("request_id=1") + + req2 = requests.pop + expect(req2[:query_string]).to eq("request_id=2") + expect(attempt).to eq(2) + end + end + end + + it "merges dynamic params with existing query params in URI" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?static=value" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + {"dynamic" => "param"} + end + end) do |client| + received_req = requests.pop + expect(received_req[:query_string]).to include("static=value") + expect(received_req[:query_string]).to include("dynamic=param") + end + end + end + + it "allows dynamic params to override existing query params" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?key=original" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + {"key" => "overridden"} + end + end) do |client| + received_req = requests.pop + # Dynamic params should override static ones + expect(received_req[:query_string]).to eq("key=overridden") + end + end + end + + it "works without query_params callback (backward compatibility)" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri)) do |client| + received_req = requests.pop + expect(received_req[:query_string]).to be_nil + end + end + end + + it "preserves existing query params when no callback is set" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?existing=param" + with_client(subject.new(base_uri_with_params)) do |client| + received_req = requests.pop + expect(received_req[:query_string]).to eq("existing=param") + end + end + end + + it "handles callback returning empty hash" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?existing=param" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + {} + end + end) do |client| + received_req = requests.pop + # Empty hash should preserve existing params (consistent with Python behavior) + expect(received_req[:query_string]).to eq("existing=param") + end + end + end + + it "handles callback returning nil gracefully" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + nil + end + end) do |client| + received_req = requests.pop + # Should proceed with base URI (no query params) + expect(received_req[:query_string]).to be_nil + end + end + end + + it "handles callback raising exception gracefully" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?fallback=value" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + raise "Test exception" + end + end) do |client| + received_req = requests.pop + # Should fall back to base URI params + expect(received_req[:query_string]).to eq("fallback=value") + end + end + end + + it "handles callback returning non-Hash value gracefully" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + base_uri_with_params = "#{server.base_uri}?fallback=value" + with_client(subject.new(base_uri_with_params) do |c| + c.query_params do + "not a hash" + end + end) do |client| + received_req = requests.pop + # Should fall back to base URI params + expect(received_req[:query_string]).to eq("fallback=value") + end + end + end + + it "updates query parameters on each reconnection attempt" do + with_server do |server| + requests = Queue.new + attempt = 0 + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + attempt += 1 + if attempt <= 2 + res.status = 500 + res.body = "error" + res.keep_alive = false + else + send_stream_content(res, "", keep_open: true) + end + end + + connection_count = 0 + with_client(subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c| + c.query_params do + connection_count += 1 + {"connection" => connection_count.to_s} + end + end) do |client| + req1 = requests.pop + expect(req1[:query_string]).to eq("connection=1") + + req2 = requests.pop + expect(req2[:query_string]).to eq("connection=2") + + req3 = requests.pop + expect(req3[:query_string]).to eq("connection=3") + expect(attempt).to eq(3) + end + end + end + + it "handles URL-encoded query parameter values" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + {"basis" => "p:ABC:123", "filter" => "test value with spaces"} + end + end) do |client| + received_req = requests.pop + expect(received_req[:query_string]).to include("basis=p%3AABC%3A123") + expect(received_req[:query_string]).to include("filter=test+value+with+spaces") + end + end + end + + it "works with multiple query parameters" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + request_data = { query_string: req.query_string } + requests << request_data + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri) do |c| + c.query_params do + { + "param1" => "value1", + "param2" => "value2", + "param3" => "value3", + } + end + end) do |client| + received_req = requests.pop + query_string = received_req[:query_string] + expect(query_string).to include("param1=value1") + expect(query_string).to include("param2=value2") + expect(query_string).to include("param3=value3") + end + end + end + end end