-
Notifications
You must be signed in to change notification settings - Fork 245
fix: inconsistent state detection and rollback #2983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,12 @@ var ( | |
| // Ensure EngineAPIExecutionClient implements the execution.Execute interface | ||
| var _ execution.Executor = (*EngineClient)(nil) | ||
|
|
||
| // Ensure EngineClient implements the execution.HeightProvider interface | ||
| var _ execution.HeightProvider = (*EngineClient)(nil) | ||
|
|
||
| // Ensure EngineClient implements the execution.Rollbackable interface | ||
| var _ execution.Rollbackable = (*EngineClient)(nil) | ||
|
|
||
| // validatePayloadStatus checks the payload status and returns appropriate errors. | ||
| // It implements the Engine API specification's status handling: | ||
| // - VALID: Operation succeeded, return nil | ||
|
|
@@ -338,7 +344,7 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { | |
| func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) { | ||
|
|
||
| // 1. Check for idempotent execution | ||
| stateRoot, payloadID, found, err := c.checkIdempotency(ctx, blockHeight, timestamp, txs) | ||
| stateRoot, payloadID, found, err := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs) | ||
| if err != nil { | ||
| c.logger.Warn().Err(err).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed") | ||
| // Continue execution on error, as it might be transient | ||
|
|
@@ -548,6 +554,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common. | |
|
|
||
| // doForkchoiceUpdate performs the actual forkchoice update RPC call with retry logic. | ||
| func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error { | ||
|
|
||
| // Call forkchoice update with retry logic for SYNCING status | ||
| err := retryWithBackoffOnPayloadStatus(ctx, func() error { | ||
| forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil) | ||
|
|
@@ -690,35 +697,57 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte) | |
| return stateRoot, err | ||
| } | ||
|
|
||
| // checkIdempotency checks if the block at the given height and timestamp has already been executed. | ||
| // reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed. | ||
| // It returns: | ||
| // - stateRoot: non-nil if block is already promoted/finalized (idempotent success) | ||
| // - payloadID: non-nil if block execution was started but not finished (resume needed) | ||
| // - found: true if either of the above is true | ||
| // - err: error during checks | ||
| func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) { | ||
| func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) { | ||
| // 1. Check ExecMeta from store | ||
| execMeta, err := c.store.GetExecMeta(ctx, height) | ||
| if err == nil && execMeta != nil { | ||
| // If we already have a promoted block at this height, return the stored StateRoot | ||
| // If we already have a promoted block at this height, verify timestamp matches | ||
| // to catch Dual-Store Conflicts where ExecMeta was saved for an old block | ||
| // that was later replaced via consensus. | ||
| if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 { | ||
| c.logger.Info(). | ||
| if execMeta.Timestamp == timestamp.Unix() { | ||
| c.logger.Info(). | ||
| Uint64("height", height). | ||
| Str("stage", execMeta.Stage). | ||
| Msg("ExecuteTxs: reusing already-promoted execution (idempotent)") | ||
| return execMeta.StateRoot, nil, true, nil | ||
| } | ||
| // Timestamp mismatch - ExecMeta is stale from an old block that was replaced. | ||
| // Ignore it and proceed to EL check which will handle rollback if needed. | ||
| c.logger.Warn(). | ||
| Uint64("height", height). | ||
| Str("stage", execMeta.Stage). | ||
| Msg("ExecuteTxs: reusing already-promoted execution (idempotent)") | ||
| return execMeta.StateRoot, nil, true, nil | ||
| Int64("execmeta_timestamp", execMeta.Timestamp). | ||
| Int64("requested_timestamp", timestamp.Unix()). | ||
| Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record") | ||
| } | ||
|
|
||
| // If we have a started execution with a payloadID, return it to resume | ||
| // If we have a started execution with a payloadID, validate it still exists before resuming. | ||
| // After node restart, the EL's payload cache is ephemeral and the payloadID may be stale. | ||
| if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 { | ||
| c.logger.Info(). | ||
| Uint64("height", height). | ||
| Str("stage", execMeta.Stage). | ||
| Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") | ||
|
|
||
| var pid engine.PayloadID | ||
| copy(pid[:], execMeta.PayloadID) | ||
| return nil, &pid, true, nil | ||
|
|
||
| // Validate payload still exists by attempting to retrieve it | ||
| if _, err = c.engineClient.GetPayload(ctx, pid); err == nil { | ||
| c.logger.Info(). | ||
| Uint64("height", height). | ||
| Str("stage", execMeta.Stage). | ||
| Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") | ||
| return nil, &pid, true, nil | ||
| } | ||
| // Payload is stale (expired or node restarted) - proceed with fresh execution | ||
| c.logger.Warn(). | ||
| Uint64("height", height). | ||
| Str("payloadID", pid.String()). | ||
| Err(err). | ||
| Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute") | ||
| // Don't return - fall through to fresh execution | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -744,12 +773,27 @@ func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, time | |
|
|
||
| return existingStateRoot.Bytes(), nil, true, nil | ||
| } | ||
| // Timestamp mismatch - log warning but proceed | ||
| // We need to rollback the EL to height-1 so it can re-execute | ||
| c.logger.Warn(). | ||
| Uint64("height", height). | ||
| Uint64("existingTimestamp", existingTimestamp). | ||
| Int64("requestedTimestamp", timestamp.Unix()). | ||
| Msg("ExecuteTxs: block exists at height but timestamp differs") | ||
| Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync") | ||
|
|
||
| // Rollback to height-1 to allow re-execution with correct timestamp | ||
| if height > 0 { | ||
| if err := c.Rollback(ctx, height-1); err != nil { | ||
| c.logger.Error().Err(err). | ||
| Uint64("height", height). | ||
| Uint64("rollback_target", height-1). | ||
| Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch") | ||
| return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err) | ||
| } | ||
| c.logger.Info(). | ||
| Uint64("height", height). | ||
| Uint64("rollback_target", height-1). | ||
| Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp") | ||
| } | ||
| } | ||
|
|
||
| return nil, nil, false, nil | ||
|
|
@@ -907,6 +951,47 @@ func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) { | |
| return header.Number.Uint64(), nil | ||
| } | ||
|
|
||
| // Rollback resets the execution layer head to the specified height using forkchoice update. | ||
| // This is used for recovery when the EL is ahead of the consensus layer (e.g., during rolling restarts | ||
| // | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this function be called while the node is running?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For EVM yes (as it resends a payload), for the SDK no. (and our DM discussion #2983 (comment), showed that ev-abci won't implement it)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want inflight? is that a goal for later on?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well this is done in-flight right in this PR, so yeah: https://github.com/evstack/ev-node/pull/2983/changes#diff-73d38fc0f126d3d764a9855162c561f8a2cfe9595029aa13f510ab5937885d2dR784-R796
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have updated the doc to mention in-flight updates on the interface. |
||
| // Implements the execution.Rollbackable interface. | ||
| func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we wire the command directly then? For consitency, a good follow-up would be to implement this in ev-abci and use it in the rollback command as well instead of directly calling: https://github.com/evstack/ev-abci/blob/e905b0b/server/rollback_cmd.go#L91-L95.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 I have added it to the rollback cmd. |
||
| // Get block hash at target height | ||
| blockHash, _, _, _, err := c.getBlockInfo(ctx, targetHeight) | ||
| if err != nil { | ||
| return fmt.Errorf("get block at height %d: %w", targetHeight, err) | ||
| } | ||
|
|
||
| c.logger.Info(). | ||
| Uint64("target_height", targetHeight). | ||
| Str("block_hash", blockHash.Hex()). | ||
| Msg("rolling back execution layer via forkchoice update") | ||
|
|
||
| // Reset head, safe, and finalized to target block | ||
| // This forces the EL to reorg its canonical chain to the target height | ||
| c.mu.Lock() | ||
| c.currentHeadBlockHash = blockHash | ||
| c.currentHeadHeight = targetHeight | ||
| c.currentSafeBlockHash = blockHash | ||
| c.currentFinalizedBlockHash = blockHash | ||
| args := engine.ForkchoiceStateV1{ | ||
| HeadBlockHash: blockHash, | ||
| SafeBlockHash: blockHash, | ||
| FinalizedBlockHash: blockHash, | ||
| } | ||
| c.mu.Unlock() | ||
|
|
||
| if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil { | ||
| return fmt.Errorf("forkchoice update for rollback failed: %w", err) | ||
| } | ||
alpe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| c.logger.Info(). | ||
| Uint64("target_height", targetHeight). | ||
| Msg("execution layer rollback completed") | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // decodeSecret decodes a hex-encoded JWT secret string into a byte slice. | ||
| func decodeSecret(jwtSecret string) ([]byte, error) { | ||
| secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x")) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still a weak check without hash but it fixed the problems on restart