diff --git a/apps/evm/cmd/rollback.go b/apps/evm/cmd/rollback.go index f28ebb8bd5..8fefb4f0ec 100644 --- a/apps/evm/cmd/rollback.go +++ b/apps/evm/cmd/rollback.go @@ -1,13 +1,18 @@ package cmd import ( + "bytes" "context" "errors" "fmt" + "os" + "github.com/ethereum/go-ethereum/common" + ds "github.com/ipfs/go-datastore" "github.com/spf13/cobra" goheaderstore "github.com/celestiaorg/go-header/store" + "github.com/evstack/ev-node/execution/evm" rollcmd "github.com/evstack/ev-node/pkg/cmd" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -42,7 +47,7 @@ func NewRollbackCmd() *cobra.Command { defer func() { if closeErr := rawEvolveDB.Close(); closeErr != nil { - fmt.Printf("Warning: failed to close evolve database: %v\n", closeErr) + cmd.Printf("Warning: failed to close evolve database: %v\n", closeErr) } }() @@ -63,6 +68,17 @@ func NewRollbackCmd() *cobra.Command { return fmt.Errorf("failed to rollback ev-node state: %w", err) } + // rollback execution layer via EngineClient + engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB) + if err != nil { + cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err) + } else { + if err := engineClient.Rollback(goCtx, height); err != nil { + return fmt.Errorf("failed to rollback execution layer: %w", err) + } + cmd.Printf("Rolled back execution layer to height %d\n", height) + } + // rollback ev-node goheader state headerStore, err := goheaderstore.NewStore[*types.SignedHeader]( evolveDB, @@ -101,7 +117,7 @@ func NewRollbackCmd() *cobra.Command { errs = errors.Join(errs, fmt.Errorf("failed to rollback data sync service state: %w", err)) } - fmt.Printf("Rolled back ev-node state to height %d\n", height) + cmd.Printf("Rolled back ev-node state to height %d\n", height) if syncNode { fmt.Println("Restart the node with the `--evnode.clear_cache` flag") } @@ -113,5 +129,42 @@ func NewRollbackCmd() *cobra.Command { cmd.Flags().Uint64Var(&height, "height", 0, "rollback to a specific height") cmd.Flags().BoolVar(&syncNode, "sync-node", false, "sync node (no aggregator)") + // EVM flags for execution layer rollback + cmd.Flags().String(evm.FlagEvmEthURL, "http://localhost:8545", "URL of the Ethereum JSON-RPC endpoint") + cmd.Flags().String(evm.FlagEvmEngineURL, "http://localhost:8551", "URL of the Engine API endpoint") + cmd.Flags().String(evm.FlagEvmJWTSecretFile, "", "Path to file containing the JWT secret for authentication") + return cmd } + +func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) { + ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL) + if err != nil { + return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err) + } + engineURL, err := cmd.Flags().GetString(evm.FlagEvmEngineURL) + if err != nil { + return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEngineURL, err) + } + + jwtSecretFile, err := cmd.Flags().GetString(evm.FlagEvmJWTSecretFile) + if err != nil { + return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmJWTSecretFile, err) + } + + if jwtSecretFile == "" { + return nil, fmt.Errorf("JWT secret file must be provided via --evm.jwt-secret-file for EL rollback") + } + + secretBytes, err := os.ReadFile(jwtSecretFile) + if err != nil { + return nil, fmt.Errorf("failed to read JWT secret from file '%s': %w", jwtSecretFile, err) + } + jwtSecret := string(bytes.TrimSpace(secretBytes)) + + if jwtSecret == "" { + return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile) + } + + return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false) +} diff --git a/core/execution/execution.go b/core/execution/execution.go index 5085ebe578..23b77f4a97 100644 --- a/core/execution/execution.go +++ b/core/execution/execution.go @@ -102,3 +102,15 @@ type HeightProvider interface { // - error: Any errors during height retrieval GetLatestHeight(ctx context.Context) (uint64, error) } + +// Rollbackable is an optional interface that execution clients can implement +// to support automatic rollback when the execution layer is ahead of the target height. +// This enables automatic recovery during rolling restarts when the EL has committed +// blocks that were not replicated to the consensus layer. +// +// Requirements: +// - Only execution layers supporting in-flight rollback should implement this. +type Rollbackable interface { + // Rollback resets the execution layer head to the specified height. + Rollback(ctx context.Context, targetHeight uint64) error +} diff --git a/execution/evm/execution.go b/execution/evm/execution.go index c310af06d7..9b60b00061 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -58,6 +58,12 @@ var ( // Ensure EngineAPIExecutionClient implements the execution.Execute interface var _ execution.Executor = (*EngineClient)(nil) +// Ensure EngineClient implements the execution.HeightProvider interface +var _ execution.HeightProvider = (*EngineClient)(nil) + +// Ensure EngineClient implements the execution.Rollbackable interface +var _ execution.Rollbackable = (*EngineClient)(nil) + // validatePayloadStatus checks the payload status and returns appropriate errors. // It implements the Engine API specification's status handling: // - VALID: Operation succeeded, return nil @@ -338,7 +344,7 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) { // 1. Check for idempotent execution - stateRoot, payloadID, found, err := c.checkIdempotency(ctx, blockHeight, timestamp, txs) + stateRoot, payloadID, found, err := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs) if err != nil { c.logger.Warn().Err(err).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed") // Continue execution on error, as it might be transient @@ -548,6 +554,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common. // doForkchoiceUpdate performs the actual forkchoice update RPC call with retry logic. func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error { + // Call forkchoice update with retry logic for SYNCING status err := retryWithBackoffOnPayloadStatus(ctx, func() error { forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil) @@ -690,35 +697,57 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte) return stateRoot, err } -// checkIdempotency checks if the block at the given height and timestamp has already been executed. +// reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed. // It returns: // - stateRoot: non-nil if block is already promoted/finalized (idempotent success) // - payloadID: non-nil if block execution was started but not finished (resume needed) // - found: true if either of the above is true // - err: error during checks -func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) { +func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) { // 1. Check ExecMeta from store execMeta, err := c.store.GetExecMeta(ctx, height) if err == nil && execMeta != nil { - // If we already have a promoted block at this height, return the stored StateRoot + // If we already have a promoted block at this height, verify timestamp matches + // to catch Dual-Store Conflicts where ExecMeta was saved for an old block + // that was later replaced via consensus. if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 { - c.logger.Info(). + if execMeta.Timestamp == timestamp.Unix() { + c.logger.Info(). + Uint64("height", height). + Str("stage", execMeta.Stage). + Msg("ExecuteTxs: reusing already-promoted execution (idempotent)") + return execMeta.StateRoot, nil, true, nil + } + // Timestamp mismatch - ExecMeta is stale from an old block that was replaced. + // Ignore it and proceed to EL check which will handle rollback if needed. + c.logger.Warn(). Uint64("height", height). - Str("stage", execMeta.Stage). - Msg("ExecuteTxs: reusing already-promoted execution (idempotent)") - return execMeta.StateRoot, nil, true, nil + Int64("execmeta_timestamp", execMeta.Timestamp). + Int64("requested_timestamp", timestamp.Unix()). + Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record") } - // If we have a started execution with a payloadID, return it to resume + // If we have a started execution with a payloadID, validate it still exists before resuming. + // After node restart, the EL's payload cache is ephemeral and the payloadID may be stale. if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 { - c.logger.Info(). - Uint64("height", height). - Str("stage", execMeta.Stage). - Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") - var pid engine.PayloadID copy(pid[:], execMeta.PayloadID) - return nil, &pid, true, nil + + // Validate payload still exists by attempting to retrieve it + if _, err = c.engineClient.GetPayload(ctx, pid); err == nil { + c.logger.Info(). + Uint64("height", height). + Str("stage", execMeta.Stage). + Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") + return nil, &pid, true, nil + } + // Payload is stale (expired or node restarted) - proceed with fresh execution + c.logger.Warn(). + Uint64("height", height). + Str("payloadID", pid.String()). + Err(err). + Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute") + // Don't return - fall through to fresh execution } } @@ -744,12 +773,27 @@ func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, time return existingStateRoot.Bytes(), nil, true, nil } - // Timestamp mismatch - log warning but proceed + // We need to rollback the EL to height-1 so it can re-execute c.logger.Warn(). Uint64("height", height). Uint64("existingTimestamp", existingTimestamp). Int64("requestedTimestamp", timestamp.Unix()). - Msg("ExecuteTxs: block exists at height but timestamp differs") + Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync") + + // Rollback to height-1 to allow re-execution with correct timestamp + if height > 0 { + if err := c.Rollback(ctx, height-1); err != nil { + c.logger.Error().Err(err). + Uint64("height", height). + Uint64("rollback_target", height-1). + Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch") + return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err) + } + c.logger.Info(). + Uint64("height", height). + Uint64("rollback_target", height-1). + Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp") + } } return nil, nil, false, nil @@ -907,6 +951,47 @@ func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) { return header.Number.Uint64(), nil } +// Rollback resets the execution layer head to the specified height using forkchoice update. +// This is used for recovery when the EL is ahead of the consensus layer (e.g., during rolling restarts +// +// Implements the execution.Rollbackable interface. +func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error { + // Get block hash at target height + blockHash, _, _, _, err := c.getBlockInfo(ctx, targetHeight) + if err != nil { + return fmt.Errorf("get block at height %d: %w", targetHeight, err) + } + + c.logger.Info(). + Uint64("target_height", targetHeight). + Str("block_hash", blockHash.Hex()). + Msg("rolling back execution layer via forkchoice update") + + // Reset head, safe, and finalized to target block + // This forces the EL to reorg its canonical chain to the target height + c.mu.Lock() + c.currentHeadBlockHash = blockHash + c.currentHeadHeight = targetHeight + c.currentSafeBlockHash = blockHash + c.currentFinalizedBlockHash = blockHash + args := engine.ForkchoiceStateV1{ + HeadBlockHash: blockHash, + SafeBlockHash: blockHash, + FinalizedBlockHash: blockHash, + } + c.mu.Unlock() + + if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil { + return fmt.Errorf("forkchoice update for rollback failed: %w", err) + } + + c.logger.Info(). + Uint64("target_height", targetHeight). + Msg("execution layer rollback completed") + + return nil +} + // decodeSecret decodes a hex-encoded JWT secret string into a byte slice. func decodeSecret(jwtSecret string) ([]byte, error) { secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x"))