Skip to content

Conversation

@jyothsnakonisa
Copy link
Contributor

No description provided.

Copy link
Contributor

@bbotella bbotella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

@jyothsnakonisa jyothsnakonisa force-pushed the stream-buffer-fix branch 2 times, most recently from a58005f to 57732bb Compare February 6, 2026 22:50
@jyothsnakonisa
Copy link
Contributor Author

jyothsnakonisa commented Feb 8, 2026

VertxStreamBuffer.copyBytes() was calling flip() after each write, resetting buffer position to 0. When multiple chunks were written to the same buffer, subsequent chunks overwrote previous data instead of appending, causing Stream API failures.

Example:

  • Chunk 1: Write "AAAA" at position 0 → flip() → position resets to 0
  • Chunk 2: Write "BBBB" → overwrites "AAAA"
  • Result: Corrupted data

Solution:

  1. Remove flip() from StreamBuffer.copyBytes() - This method writes data chunks and should only advance position, not flip. The caller is responsible for flipping after all chunks are written.
  2. Ensure flip() in CdcRandomAccessReader.rebuffer() - After writing all data, flip() transitions the buffer from write mode to read mode by resetting position to 0 and setting limit to the valid data length. This ensures RandomAccessReader can correctly read the buffered data.

tests passed without flip() because:

  • Buffers were always completely filled (enforced by assert buffer.remaining() == 0)
  • RandomAccessReader#reBufferAt resets position to 0 regardless of flip

However, flip() remains essential for correctness in partial-fill scenarios (e.g., last buffer in file, or when less data is available than buffer capacity).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid the changes in the analytics-sidecar-client and the other sidecar-copied subprojects at the best. Those files are supposed to be in sync with Sidecar.
I would revert the changes in this file and the test file. The VertxStreamBuffer and its test in another package.

Comment on lines +30 to 35
* Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer destination}.
* <p>
* This method writes {@code length} bytes starting at the destination buffer's current position
* and advances the position by {@code length}. The caller is responsible for calling
* {@link ByteBuffer#flip()} on the destination buffer before reading from it.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the docs. Let's actually add the docs in org.apache.cassandra.spark.utils.streaming.StreamBuffer instead, not this file.

@@ -1,5 +1,6 @@
0.3.0
-----
* Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption (CASSANALYTICS-116)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the description is no longer accurate. Please update to reflect the actual fix.

…ausing data corruption

Patch by Jyothsna Konisa; Reviewed by Yifan Cai and Bernardo Botella for CASSANALYTICS-116
@jyothsnakonisa jyothsnakonisa merged commit 018250a into apache:trunk Feb 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants