-
Notifications
You must be signed in to change notification settings - Fork 27
[AIT-258] feat: add Realtime mutable message support #660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: AIT-196/rest-edits-deletes
Are you sure you want to change the base?
Conversation
- Updated `ConnectionManager` and `MessageQueue` to process `PublishResult` during acknowledgments (ACK/NACK). - Extended `send_protocol_message` to return `PublishResult` for publish tracking. - Bumped default `protocol_version` to 5. - Added tests for message update, delete, append operations, and PublishResult handling.
WalkthroughThe PR introduces support for mutable messages in realtime channels by upgrading the protocol version to 5, adding message update/delete/append operations, returning PublishResult objects from publish calls, and propagating results through the connection layer with comprehensive test coverage. Changes
Sequence DiagramsequenceDiagram
participant App
participant RealtimeChannel
participant ConnectionManager
participant WebSocketTransport
participant Server
App->>RealtimeChannel: publish() or update_message()
RealtimeChannel->>ConnectionManager: send_protocol_message()
ConnectionManager->>WebSocketTransport: send(ProtocolMessage)
WebSocketTransport->>Server: WebSocket frame with ACK request
Server->>WebSocketTransport: ACK with optional res (PublishResult[])
WebSocketTransport->>ConnectionManager: on_ack(serial, count, res)
ConnectionManager->>ConnectionManager: complete_messages(serial, count, res)
ConnectionManager->>ConnectionManager: Pair results via zip_longest()
ConnectionManager->>App: Return Future[PublishResult]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@test/ably/realtime/realtimechannelmutablemessages_test.py`:
- Around line 182-191: The test calls channel.publish with a keyword
(channel.publish(messages=messages)) but the publish implementation inspects
only positional args (checks len(args)), so kwargs are ignored and cause a
ValueError; change the call to pass the messages list positionally
(channel.publish(messages)) so publish receives the messages via args and the
test will exercise publish correctly.
🧹 Nitpick comments (2)
ably/realtime/connectionmanager.py (1)
99-109: Consider handling mismatched lengths between completed messages and results.Using
zip_longestensures all completed messages are processed even whenres_listis shorter or empty. However, ifres_listis longer thancompleted_messages,msgwill beNonefor the extra items, which could cause anAttributeErrorwhen accessingmsg.future.♻️ Suggested fix to guard against None msg
# Default res to empty list if None res_list = res if res is not None else [] for (msg, publish_result) in zip_longest(completed_messages, res_list): + if msg is None: + continue if msg.future and not msg.future.done(): if err: msg.future.set_exception(err)ably/realtime/realtime_channel.py (1)
964-981: Potential ambiguity when bothoptionsandkwargsare provided.The current logic converts
kwargstoChannelOptionsonly whenoptionsisNone. If a user accidentally provides bothoptionsandkwargs, thekwargswill be silently ignored. Consider raising an error or warning in this case.♻️ Suggested fix to warn about ignored kwargs
# Convert kwargs to ChannelOptions if provided if kwargs and not options: options = ChannelOptions(**kwargs) + elif kwargs and options: + log.warning("Channels.get(): kwargs ignored when options is provided") elif options and isinstance(options, dict): options = ChannelOptions.from_dict(options)
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.pyably/realtime/realtime_channel.pyably/transport/defaults.pyably/transport/websockettransport.pytest/ably/realtime/realtimechannelmutablemessages_test.py
🧰 Additional context used
🧬 Code graph analysis (2)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
ably(780-781)on_ack(468-479)count(59-61)ably/types/operations.py (1)
PublishResult(48-67)
ably/realtime/connectionmanager.py (2)
ably/types/operations.py (1)
PublishResult(48-67)ably/util/exceptions.py (1)
AblyException(9-90)
🪛 Gitleaks (8.30.0)
test/ably/realtime/realtimechannelmutablemessages_test.py
[high] 158-158: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.14)
- GitHub Check: check (3.12)
- GitHub Check: check (3.11)
- GitHub Check: check (3.8)
- GitHub Check: check (3.13)
- GitHub Check: check (3.10)
- GitHub Check: check (3.9)
🔇 Additional comments (19)
ably/transport/websockettransport.py (2)
15-15: LGTM!Import of
PublishResultis correctly added to support the new ACK handling flow.
176-179: LGTM!The ACK handling correctly extracts the optional
resfield, converts each item to aPublishResultobject when present, and passes it to the connection manager. The null-safety check before conversion is appropriate.ably/realtime/connectionmanager.py (7)
7-7: LGTM!Import of
zip_longestis appropriate for aligning completed messages with their correspondingPublishResultentries.
34-34: LGTM!Updated type annotation correctly reflects that the future now resolves to a
PublishResultinstead ofNone.
63-78: LGTM!The updated signature correctly accepts the
resparameter and the docstring accurately describes the new behavior.
216-253: LGTM!The
send_protocol_messagemethod correctly returns thePublishResultwhen awaiting ACK-required messages andNoneotherwise. The flow handles both queued and immediate sends appropriately.
255-273: LGTM!The
_send_protocol_message_on_connected_statemethod correctly propagates thePublishResultfrom the future when ACK is required.
468-479: LGTM!The
on_ackmethod correctly accepts and forwards theresparameter tocomplete_messages.
493-493: LGTM!Correctly passes
Noneforresand the error tocomplete_messagesfor NACK handling.ably/realtime/realtime_channel.py (5)
13-15: LGTM!Imports are correctly added to support the new mutable message operations.
394-494: LGTM!The
publishmethod correctly returns thePublishResultfrom the connection manager, enabling callers to access message serials for subsequent update/delete operations.
526-606: Consider using MessageAction enum for action.name in log statement.The
_send_updateimplementation correctly validates serial presence, constructs the message with version metadata, handles encryption, and returns theUpdateDeleteResult. The logic is sound.One minor observation: Line 588 uses
action.namein the log, which assumesMessageActionis an enum with anameattribute - this should work correctly with Python enums.
608-672: LGTM!The public convenience methods
update_message,delete_message, andappend_messageare well-documented and correctly delegate to_send_updatewith the appropriateMessageActionvalues.
674-718: LGTM!The REST delegation methods correctly call the parent
Channelmethods, providing a unified API for accessing messages via the REST API from the realtime channel.test/ably/realtime/realtimechannelmutablemessages_test.py (4)
15-23: LGTM!Good use of parametrization to test both JSON and MsgPack transports. The fixture properly configures the Ably client based on the transport type.
154-176: Test key is appropriate for testing - not a real secret.The static analysis tool flagged
'keyfordecrypt_16'as a potential API key, but this is a test encryption key used only in unit tests and is not a real secret. This is a false positive.
266-278: LGTM!The helper method
wait_until_message_with_action_appearscorrectly uses polling to wait for a message with a specific action, with proper exception handling.
280-289: LGTM!The helper method
wait_until_get_all_message_versioncorrectly polls for version history until the expected count is reached.ably/transport/defaults.py (1)
2-2: Protocol version 5 is properly supported and already tested.Ably Protocol v5 is officially supported with comprehensive test coverage for mutable messages (updates, deletes, appends) in both REST and realtime channels. The integration in
connectionmanager.pycorrectly passes the protocol version, and the test infrastructure includes the required mutable messages support.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| messages = [ | ||
| Message('event1', 'data1'), | ||
| Message('event2', 'data2'), | ||
| Message('event3', 'data3') | ||
| ] | ||
|
|
||
| result = await channel.publish(messages=messages) | ||
| assert result is not None | ||
| assert hasattr(result, 'serials') | ||
| assert len(result.serials) == 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "realtime_channel.py" -type fRepository: ably/ably-python
Length of output: 95
🏁 Script executed:
ast-grep --pattern $'async def publish(self, $$$) {
$$$
}'Repository: ably/ably-python
Length of output: 42
🏁 Script executed:
ast-grep --pattern $'async def publish(self, $_):
$$$
'Repository: ably/ably-python
Length of output: 42
🏁 Script executed:
wc -l ./ably/realtime/realtime_channel.pyRepository: ably/ably-python
Length of output: 100
🏁 Script executed:
rg -n "async def publish" ./ably/realtime/realtime_channel.py -A 5Repository: ably/ably-python
Length of output: 299
🏁 Script executed:
sed -n '394,450p' ./ably/realtime/realtime_channel.pyRepository: ably/ably-python
Length of output: 2380
The channel.publish(messages=messages) call at line 188 is incorrect.
The publish method signature at realtime_channel.py:394 accepts *args and **kwargs, but the implementation only processes positional arguments via len(args). When called with a keyword argument messages=messages, args will be empty, causing a ValueError: publish() expects either (name, data) or a message object or array of messages.
The correct invocation is:
result = await channel.publish(messages)🤖 Prompt for AI Agents
In `@test/ably/realtime/realtimechannelmutablemessages_test.py` around lines 182 -
191, The test calls channel.publish with a keyword
(channel.publish(messages=messages)) but the publish implementation inspects
only positional args (checks len(args)), so kwargs are ignored and cause a
ValueError; change the call to pass the messages list positionally
(channel.publish(messages)) so publish receives the messages via args and the
test will exercise publish correctly.
ConnectionManagerandMessageQueueto processPublishResultduring acknowledgments (ACK/NACK).send_protocol_messageto returnPublishResultfor publish tracking.protocol_versionto 5.Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.