Skip to content

Conversation

@tac0turtle
Copy link
Contributor

Overview

this pr add sync modes allowing us to move to subscriptions when at the head of the chain

@claude
Copy link
Contributor

claude bot commented Jan 8, 2026

Claude finished @tac0turtle's task in 3m 9s —— View job


Comprehensive Code Review

I've completed a thorough review of the sync mode implementation. This is a significant architectural improvement that introduces dynamic synchronization modes for the DA worker. Here are my findings:

Executive Summary

Overall Assessment: The implementation is architecturally sound with good separation of concerns. However, there are several critical bugs, security concerns, and design issues that must be addressed before merging.

Critical Issues Found: 3
High Priority Issues: 4
Medium Priority Issues: 6
Low Priority/Suggestions: 4


1. Code Quality & Architecture

✅ Strengths

  • Clean state machine design: The dual-mode sync system (catchup/follow) is well-designed with clear transitions
  • Good interface abstraction: DARetriever interface allows for testability and the new Subscribe() and LocalHead() methods are properly integrated
  • Proper use of atomic operations: Mode tracking and height management use appropriate atomic primitives
  • Excellent logging and observability: New metrics (SyncMode, SubscribeErrors, ModeSwitches) provide good visibility

❌ Critical Issues

CRITICAL 1: Race condition in processSubscriptionResponse (syncer.go:566-579)

for _, event := range events {
    select {
    case s.heightInCh <- event:
        // ...
    default:
        s.cache.SetPendingEvent(event.Header.Height(), &event)
        // ...
    }
}

Issue: Taking the address of the loop variable event with &event creates a pointer to the same memory location for all iterations. After the loop completes, all cached pending events will point to the last event, causing data corruption and loss.

Impact: Loss of subscription events, incorrect block processing, potential chain halt.

Fix: Add event := event at the start of the loop body to create a new variable for each iteration:

for _, event := range events {
    event := event // Capture loop variable
    select {
    case s.heightInCh <- event:

Note: Gemini correctly identified this in the review comments. While Go 1.22+ fixes this for for range loops, it's still best practice to be explicit when taking addresses, especially since the codebase may target earlier Go versions.

Fix race condition →


CRITICAL 2: Watchdog timeout can trigger false mode switches (syncer.go:534-541)

case <-time.After(watchdogTimeout):
    s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered, checking sync mode")
    if s.determineSyncMode() == SyncModeCatchup {
        return errors.New("fell behind, switching to catchup")
    }

Issue: If the DA layer simply has no new blocks for watchdogTimeout duration (3× block time), the watchdog triggers and checks determineSyncMode(). If a network delay occurs during the LocalHead() call, it could falsely determine the node is behind and switch to catchup mode, causing unnecessary polling.

Impact: Unnecessary mode thrashing, increased DA layer load, performance degradation.

Recommendation: Add a grace period or retry logic before switching modes. Consider tracking consecutive watchdog triggers:

case <-time.After(watchdogTimeout):
    localHead, err := s.daClient.LocalHead(s.ctx)
    if err != nil {
        // Network error - don't switch modes yet, retry
        s.logger.Warn().Err(err).Msg("watchdog: failed to get local head, continuing in follow mode")
        continue
    }
    if s.daRetrieverHeight.Load() + catchupThreshold < localHead {
        return errors.New("fell behind, switching to catchup")
    }

Fix watchdog logic →


CRITICAL 3: Missing validation of subscription response data (syncer.go:546-587)

func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error {
    if resp == nil || len(resp.Blobs) == 0 {
        return nil
    }
    // ... processes blobs directly without validation
    events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)

Issue: Subscription responses are processed without validating:

  1. Height monotonicity (is resp.Height ≥ last processed height?)
  2. Height sanity (is resp.Height reasonable, or could it be maliciously large?)
  3. Blob count limits (could a malicious DA node send millions of blobs?)

Impact: Potential DoS attack, out-of-order processing, resource exhaustion.

Recommendation: Add validation:

func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error {
    if resp == nil || len(resp.Blobs) == 0 {
        return nil
    }
    
    // Validate height
    currentHeight := s.daRetrieverHeight.Load()
    if resp.Height < currentHeight {
        s.logger.Warn().Uint64("resp_height", resp.Height).Uint64("current", currentHeight).
            Msg("ignoring subscription response with old height")
        return nil
    }
    
    // Validate blob count (reasonable upper limit)
    const maxBlobsPerResponse = 1000
    if len(resp.Blobs) > maxBlobsPerResponse {
        return fmt.Errorf("subscription response contains too many blobs: %d > %d", 
            len(resp.Blobs), maxBlobsPerResponse)
    }
    // ... rest of processing
}

Add validation →


2. Error Handling & Resilience

❌ High Priority Issues

HIGH 1: Silent subscription channel closure (syncer.go:505-520)

case resp, ok := <-headerCh:
    if !ok {
        return errors.New("header subscription closed")
    }

Issue: When a subscription channel closes, the function returns an error but doesn't log WHY it closed. Was it a network issue? Server shutdown? This makes debugging production issues difficult.

Recommendation: Add structured logging with context:

case resp, ok := <-headerCh:
    if !ok {
        s.logger.Error().
            Str("channel", "header").
            Uint64("last_height", s.daRetrieverHeight.Load()).
            Msg("subscription channel unexpectedly closed")
        return errors.New("header subscription closed")
    }

HIGH 2: No backoff between subscription retries (syncer.go:440-447)

func (s *Syncer) runFollowMode() {
    // ...
    err := s.subscribeAndFollow()
    if err != nil && !errors.Is(err, context.Canceled) {
        s.metrics.SubscribeErrors.Add(1)
        s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check")
        // No explicit catchup call needed
    }
}

Issue: If subscription fails immediately (e.g., DA node down), runFollowMode() returns instantly and daWorkerLoop() will immediately call determineSyncMode() again. If the node is still caught up, it will re-enter follow mode and retry subscription immediately, creating a tight loop.

Impact: CPU spinning, log spam, network pressure on DA layer.

Recommendation: Add backoff delay in runFollowMode() on subscription failure:

func (s *Syncer) runFollowMode() {
    s.logger.Debug().Msg("running follow mode")
    s.metrics.SyncMode.Set(float64(SyncModeFollow))

    err := s.subscribeAndFollow()
    if err != nil && !errors.Is(err, context.Canceled) {
        s.metrics.SubscribeErrors.Add(1)
        s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check")
        
        // Backoff before retrying to avoid tight loop
        backoff := s.config.DA.BlockTime.Duration
        if backoff <= 0 {
            backoff = 2 * time.Second
        }
        s.sleepOrDone(backoff)
    }
}

Add backoff →


HIGH 3: Subscription context not properly cancelled on mode switch (syncer.go:457-459)

subCtx, cancel := context.WithCancel(s.ctx)
defer cancel()

Issue: When subscribeAndFollow() returns (either due to error or watchdog timeout), the defer cancel() fires, but there's a potential race where subscription channels are still being read by the select statement. This could lead to goroutine leaks in the underlying DA client if subscriptions aren't properly cleaned up.

Recommendation: Explicitly document the cleanup contract in the Client interface, or add explicit channel draining:

// After return from subscribeAndFollow, drain any pending messages
// to ensure DA client goroutines can exit cleanly
go func() {
    for range headerCh {
        // Drain
    }
}()
go func() {
    for range dataCh {
        // Drain
    }
}()

However, this may not be necessary if the DA client implementation properly handles context cancellation. Review the DA client implementation to ensure subscriptions are cleaned up when context is cancelled.


HIGH 4: ProcessBlobs doesn't return errors (da_retriever.go:156)

func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent

Issue: ProcessBlobs silently drops invalid blobs with debug logging only. This is fine for malformed blobs, but if there's a systematic issue (e.g., decoder bug, wrong namespace), operators won't notice until blocks stop syncing.

Recommendation: Add metrics for dropped blobs:

// In metrics.go, add:
BlobProcessingFailures metrics.Counter

// In ProcessBlobs:
if header := r.tryDecodeHeader(bz, daHeight); header != nil {
    // ...
} else if data := r.tryDecodeData(bz, daHeight); data != nil {
    // ...
} else {
    s.metrics.BlobProcessingFailures.Add(1)
    s.logger.Debug().Int("blob_size", len(bz)).Msg("failed to decode blob as header or data")
}

⚠️ Medium Priority Issues

MEDIUM 1: LocalHead timeout hardcoded (client.go:458-468)

func (c *client) LocalHead(ctx context.Context) (uint64, error) {
    headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
    defer cancel()
    // ...
}

Issue: Uses defaultTimeout (60s) which might be too long for a simple local query. In determineSyncMode(), this is called frequently and could slow down mode switching.

Recommendation: Use a shorter timeout for LocalHead() since it's a local query:

func (c *client) LocalHead(ctx context.Context) (uint64, error) {
    // LocalHead is a local query, should be fast
    headCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()
    // ...
}

MEDIUM 2: No handling for duplicate subscription events (syncer.go:566-579)
The subscription could theoretically send duplicate events (e.g., after reconnection). While ProcessBlobs has some duplicate detection via pendingHeaders/pendingData maps, these are transient and don't persist across restarts. Consider adding height monotonicity checks.


MEDIUM 3: Missing context check in fetchDAUntilCaughtUp (syncer.go:590-631)
The loop checks context at the start but not between successive retrievals. Long-running catchup could delay shutdown.

Recommendation: Add select { case <-s.ctx.Done(): return s.ctx.Err() } after processing events.


MEDIUM 4: subscribe errors don't distinguish failure types
SubscribeErrors metric increments for all failures, but it would be valuable to distinguish:

  • Connection failures
  • Authentication failures
  • Invalid namespace
  • Channel closed unexpectedly

This would aid in debugging production issues.


MEDIUM 5: Watchdog uses time.After in select

case <-time.After(watchdogTimeout):

This creates a new timer on every select iteration, which is inefficient. Should use time.NewTimer() and reset it after each event.


MEDIUM 6: No metrics for subscription event processing time
Would be valuable to track:

  • Time spent processing subscription responses
  • Queue depth of heightInCh
  • Subscription reconnection frequency

3. Security

✅ Strengths

  • Proper validation of proposer addresses in DA retrieval
  • Signature verification for both headers and data blobs
  • Strict mode transition prevents downgrade attacks

⚠️ Issues Identified

SEC 1: No rate limiting on subscription events
A malicious or buggy DA node could flood the subscription channels with events. There's no rate limiting or backpressure mechanism besides the bounded heightInCh channel (size 100).

Recommendation: Add rate limiting:

// Track subscription rate
lastEventTime := time.Now()
eventCount := 0

case resp, ok := <-headerCh:
    if !ok {
        return errors.New("header subscription closed")
    }
    
    // Simple rate limit: max 100 events per second
    eventCount++
    if time.Since(lastEventTime) < time.Second && eventCount > 100 {
        s.logger.Warn().Msg("subscription rate limit exceeded, throttling")
        time.Sleep(time.Second)
        lastEventTime = time.Now()
        eventCount = 0
    }

SEC 2: Subscription height could skip blocks
If subscription sends events for heights 100, 105 (skipping 101-104), the syncer will wait indefinitely for 101-104 in follow mode. Should detect gaps and fall back to catchup.


4. Performance & Resource Efficiency

✅ Strengths

  • Efficient transition between polling and subscription modes
  • Reduced DA layer load when caught up (subscription vs. polling)
  • Atomic operations for concurrent access to shared state
  • Good use of buffered channels

⚠️ Issues

PERF 1: heightInCh channel size may be insufficient
Buffer size of 100 (syncer.go:207) might fill up during burst of subscription events, causing events to be cached. Consider making this configurable or increasing based on expected throughput.


PERF 2: Unnecessary LocalHead calls
determineSyncMode() calls LocalHead() on every iteration of daWorkerLoop(), even when in catchup mode and clearly behind. Could optimize by:

  • Calling LocalHead() only when transitioning or after N catchup iterations
  • Caching the result with TTL

PERF 3: ProcessBlobs creates new slices on every call

blobs := make([][]byte, len(resp.Blobs))
for i, blob := range resp.Blobs {
    blobs[i] = blob.Data()
}

Could use a sync.Pool for blob slice allocation if this is a hot path.


5. Testing & Reliability

✅ Strengths

  • Good test coverage for mode switching logic
  • Tests for determineSyncMode with various scenarios
  • Mock-based testing allows isolation

❌ Issues

TEST 1: Low test coverage for new code
Codecov reports 26.19% patch coverage with 124 lines missing. Critical paths need better coverage:

  • processSubscriptionResponse (syncer.go:546-587) - 0% coverage
  • subscribeAndFollow (syncer.go:452-543) - minimal coverage
  • Watchdog timeout behavior - not tested
  • Mode switching under various error conditions - not tested

Recommendation: Add tests for:

  1. Subscription channel closure scenarios
  2. Watchdog timeout triggering
  3. Mode switching with various LocalHead errors
  4. Processing subscription responses with invalid data
  5. Race conditions (use -race flag)

Add subscription tests →


TEST 2: Missing integration test for mode transitions
No end-to-end test that verifies:

  1. Start in catchup mode
  2. Catch up to head
  3. Switch to follow mode
  4. Fall behind (simulate slow DA)
  5. Switch back to catchup mode

This is the core value proposition of the PR and should be tested.


TEST 3: LocalDA RPC mock doesn't implement Subscribe properly
tools/local-da/rpc.go:133-137:

func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) {
    ch := make(chan *jsonrpc.SubscriptionResponse)
    close(ch)
    return ch, nil
}

This returns a closed channel, which will cause follow mode to immediately fail in tests using LocalDA. Should implement a proper subscription mechanism for testing.

Recommendation: Implement a functional Subscribe for LocalDA:

func (s *blobServer) Subscribe(ctx context.Context, namespace libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) {
    ch := make(chan *jsonrpc.SubscriptionResponse, 10)
    
    go func() {
        defer close(ch)
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        
        lastHeight := uint64(0)
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                s.da.mu.Lock()
                currentHeight := s.da.height
                s.da.mu.Unlock()
                
                // Send new blobs if height increased
                if currentHeight > lastHeight {
                    for h := lastHeight + 1; h <= currentHeight; h++ {
                        blobs, err := s.GetAll(ctx, h, []libshare.Namespace{namespace})
                        if err == nil && len(blobs) > 0 {
                            select {
                            case ch <- &jsonrpc.SubscriptionResponse{
                                Height: h,
                                Blobs:  blobs,
                            }:
                            case <-ctx.Done():
                                return
                            }
                        }
                    }
                    lastHeight = currentHeight
                }
            }
        }
    }()
    
    return ch, nil
}

Fix LocalDA Subscribe →


6. Documentation

⚠️ Issues

DOC 1: Missing godoc for new exported constants
SyncModeCatchup, SyncModeFollow, catchupThreshold, followWatchdogMultiplier are exported but lack comprehensive documentation explaining when each mode is used and why.


DOC 2: Client interface documentation incomplete
New methods Subscribe() and LocalHead() in da/interface.go have brief comments but don't document:

  • What happens when subscription fails
  • Whether Subscribe can be called multiple times
  • Thread safety guarantees
  • What happens if LocalHead lags behind actual head

DOC 3: No architecture documentation
The PR description is brief ("add sync modes allowing us to move to subscriptions when at the head of the chain"). Should include:

  • Architecture diagram showing mode transitions
  • Performance implications
  • Rollback plan if issues arise
  • Metrics to monitor in production

7. Additional Observations

Code Style

  • ✅ Consistent error handling patterns
  • ✅ Appropriate use of structured logging
  • ✅ Good variable naming

Concurrency

  • ⚠️ Need to verify subscription cleanup on context cancellation (see HIGH 3)
  • ✅ Proper use of atomic operations for mode tracking

Compatibility

  • ✅ Backward compatible - no breaking changes to existing interfaces
  • ✅ Falls back to catchup mode gracefully on errors

Recommendations Priority

Must Fix Before Merge (Blocking)

  1. CRITICAL 1: Fix race condition in processSubscriptionResponse
  2. CRITICAL 2: Fix watchdog false positives
  3. CRITICAL 3: Add validation to subscription responses
  4. HIGH 2: Add backoff between subscription retries
  5. TEST 1 & 2: Increase test coverage to >60%
  6. TEST 3: Fix LocalDA Subscribe for testing

Should Fix (High Priority)

  1. HIGH 1: Improve subscription closure logging
  2. HIGH 3: Review DA client cleanup on context cancellation
  3. HIGH 4: Add metrics for dropped blobs
  4. SEC 1: Add rate limiting on subscriptions
  5. SEC 2: Detect and handle height gaps in subscriptions

Nice to Have (Medium Priority)

  1. All MEDIUM issues (timeouts, metrics, optimizations)
  2. Documentation improvements
  3. Performance optimizations

Summary

This is a well-architected feature that significantly improves DA synchronization efficiency. However, there are 3 critical bugs (race condition, watchdog false positives, missing validation) that must be fixed before merge, along with several high-priority issues around error handling and testing.

The low test coverage (26%) is concerning given the complexity of the state machine and concurrent subscription handling. Recommend increasing coverage to at least 60% and adding integration tests for the full catchup→follow→catchup cycle.

Once the critical issues are addressed, this will be a valuable addition that reduces DA layer load and improves sync performance for nodes at the head of the chain.

Estimated effort to address critical issues: 2-3 days
Recommended next steps:

  1. Fix the three critical bugs
  2. Add comprehensive tests (target 60%+ coverage)
  3. Fix LocalDA Subscribe implementation
  4. Add rate limiting and validation
  5. Re-review and merge

Tasks:

  • Read core implementation files
  • Review interface definitions and metrics
  • Analyze architecture and design patterns
  • Check error handling and resilience
  • Review security implications
  • Assess performance and resource efficiency
  • Evaluate test coverage and quality
  • Review documentation
  • Post comprehensive feedback

@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 12, 2026, 8:22 AM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data availability (DA) worker's synchronization capabilities by introducing dynamic sync modes. It allows the system to efficiently transition between an aggressive polling strategy when catching up to the DA chain and a more resource-optimized, subscription-based approach once it reaches the chain head. This change aims to improve the overall performance and responsiveness of DA data processing.

Highlights

  • Dynamic Sync Modes: Introduced two dynamic synchronization modes for the DA worker: 'Catchup' for aggressive polling when behind the chain head, and 'Follow' for efficient, subscription-based real-time updates when caught up.
  • Subscription-based Updates: Implemented a new Subscribe method in the DA client, enabling the 'Follow' mode to receive real-time blob notifications, significantly reducing polling overhead.
  • Local Head Tracking: Added a LocalHead method to the DA client and a corresponding RPC endpoint, allowing the syncer to determine its current position relative to the DA chain head and facilitate mode switching.
  • New Metrics for Observability: Integrated new Prometheus metrics (SyncMode, SubscribeErrors, ModeSwitches) to provide visibility into the current sync mode, subscription failures, and transitions between modes.
  • Refactored DA Worker Loop: The daWorkerLoop has been refactored to intelligently determine and switch between 'Catchup' and 'Follow' modes based on the node's synchronization status, including a watchdog mechanism for 'Follow' mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dual-mode synchronization mechanism, allowing the node to switch between an aggressive polling 'catchup' mode and a more efficient subscription-based 'follow' mode. This is a significant enhancement for nodes that are at the head of the chain. The changes are well-structured, introducing new DA client methods, metrics, and the core state machine logic in the daWorkerLoop. My review identified two critical bugs related to incorrect loop variable capturing that could lead to data corruption, and a couple of medium-severity design and style issues. Once these points are addressed, the implementation will be much more robust.

@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 36.25731% with 109 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.29%. Comparing base (aaae087) to head (ecfcf83).

Files with missing lines Patch % Lines
block/internal/syncing/syncer.go 44.44% 74 Missing and 1 partial ⚠️
tools/local-da/rpc.go 0.00% 22 Missing ⚠️
block/internal/da/client.go 0.00% 12 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2961      +/-   ##
==========================================
- Coverage   57.88%   57.29%   -0.60%     
==========================================
  Files          97       97              
  Lines        9306     9457     +151     
==========================================
+ Hits         5387     5418      +31     
- Misses       3315     3435     +120     
  Partials      604      604              
Flag Coverage Δ
combined 57.29% <36.25%> (-0.60%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tac0turtle tac0turtle force-pushed the marko/sync_subscribe branch from 95aeea4 to ecfcf83 Compare January 12, 2026 08:22
@tac0turtle tac0turtle marked this pull request as ready for review January 12, 2026 08:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants