Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require:
plugins:
- rubocop-performance

AllCops:
Expand Down Expand Up @@ -289,7 +289,7 @@ Style/PerlBackrefs:
StyleGuide: "https://github.com/bbatsov/ruby-style-guide#no-perl-regexp-last-matchers"
Enabled: false

Naming/PredicateName:
Naming/PredicatePrefix:
Description: "Check the names of predicate methods."
StyleGuide: "https://github.com/bbatsov/ruby-style-guide#bool-methods-qmark"
ForbiddenPrefixes:
Expand Down
58 changes: 56 additions & 2 deletions lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def initialize(uri,

@on = { event: ->(_) {}, error: ->(_) {} }
@last_id = last_event_id
@query_params_callback = nil

yield self if block_given?

Expand Down Expand Up @@ -206,6 +207,36 @@ def on_error(&action)
@on[:error] = action
end

#
# Specifies a block or Proc to generate query parameters dynamically. This will be called before
# each connection attempt (both initial connection and reconnections), allowing you to update
# query parameters based on the current client state.
#
# The block should return a Hash with string keys and string values, which will be merged with
# any existing query parameters in the base URI. If the callback raises an exception, it will be
# logged and the connection will proceed with the base URI's query parameters (or no query
# parameters if none were present).
#
# This is useful for scenarios where query parameters need to reflect the current state of the
# client, such as sending a "basis" parameter that represents what data the client already has.
#
# @example Using dynamic query parameters
# client = SSE::Client.new(base_uri) do |c|
# c.query_params do
# {
# "basis" => (selector.state if selector.defined?),
# "filter" => filter_key
# }.compact
# end
# c.on_event { |event| handle_event(event) }
# end
#
# @yieldreturn [Hash<String, String>] a hash of query parameter names to values
#
def query_params(&action)
@query_params_callback = action
end

#
# Permanently shuts down the client and its connection. No further events will be dispatched. This
# has no effect if called a second time.
Expand Down Expand Up @@ -289,8 +320,9 @@ def connect
end
cxn = nil
begin
@logger.info { "Connecting to event stream at #{@uri}" }
cxn = @http_client.request(@method, @uri, build_opts)
uri = build_uri_with_query_params
@logger.info { "Connecting to event stream at #{uri}" }
cxn = @http_client.request(@method, uri, build_opts)
if cxn.status.code == 200
content_type = cxn.content_type.mime_type
if content_type && content_type.start_with?("text/event-stream")
Expand Down Expand Up @@ -397,5 +429,27 @@ def build_opts
{headers: build_headers, body: resolved_payload.to_s}
end
end

def build_uri_with_query_params
uri = @uri.dup

if @query_params_callback
begin
dynamic_params = @query_params_callback.call
if dynamic_params.is_a?(Hash) && !dynamic_params.empty?
existing_params = uri.query ? URI.decode_www_form(uri.query).to_h : {}
merged_params = existing_params.merge(dynamic_params)
uri.query = URI.encode_www_form(merged_params)
elsif !dynamic_params.is_a?(Hash)
@logger.warn { "query_params callback returned non-Hash value: #{dynamic_params.class}, ignoring" }
end
rescue StandardError => e
@logger.warn { "query_params callback raised an exception: #{e.inspect}, proceeding with base URI" }
@logger.debug { "Exception trace: #{e.backtrace}" }
end
end

uri
end
end
end
Loading