Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/builder/op-rbuilder/src/tests/framework/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::{
/// This is necessary because clap reads env vars for args with `env = "..."` attributes,
/// and external OTEL env vars (e.g., `OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf`) may contain
/// values that are incompatible with the CLI's expected values.
fn clear_otel_env_vars() {
pub fn clear_otel_env_vars() {
for key in [
"OTEL_EXPORTER_OTLP_ENDPOINT",
"OTEL_EXPORTER_OTLP_HEADERS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ pub fn rb_test(args: TokenStream, input: TokenStream) -> TokenStream {
let _guard = tracing::subscriber::set_global_default(subscriber);
tracing::info!("{} start", stringify!(#test_name));

crate::tests::clear_otel_env_vars();
let instance = #instance_init;
#original_name(instance).await
}
Expand Down
3 changes: 2 additions & 1 deletion crates/client/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ test-utils = [
"base-client-node/test-utils",
"dep:derive_more",
"dep:eyre",
"reth-chain-state/test-utils",
"reth-chainspec/test-utils",
"reth-evm/test-utils",
"reth-primitives/test-utils",
Expand All @@ -29,7 +30,7 @@ base-flashtypes.workspace = true
base-client-node.workspace = true

# reth
reth-exex.workspace = true
reth-chain-state.workspace = true
reth-evm.workspace = true
reth-primitives.workspace = true
reth-rpc.workspace = true
Expand Down
50 changes: 20 additions & 30 deletions crates/client/flashblocks/src/extension.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Contains the [`FlashblocksExtension`] which wires up the flashblocks feature
//! (both the canon ExEx and RPC surface) on the Base node builder.
//! (canonical block subscription and RPC surface) on the Base node builder.

use std::sync::Arc;

use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
use futures_util::TryStreamExt;
use reth_exex::ExExEvent;
use reth_chain_state::CanonStateSubscriptions;
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
use tracing::info;
use url::Url;

Expand Down Expand Up @@ -34,7 +34,7 @@ impl FlashblocksConfig {
}
}

/// Helper struct that wires the Flashblocks feature (canon ExEx and RPC) into the node builder.
/// Helper struct that wires the Flashblocks feature (canonical subscription and RPC) into the node builder.
#[derive(Debug)]
pub struct FlashblocksExtension {
/// Optional Flashblocks configuration (includes state).
Expand All @@ -59,55 +59,45 @@ impl BaseNodeExtension for FlashblocksExtension {
let state = cfg.state;
let mut subscriber = FlashblocksSubscriber::new(state.clone(), cfg.websocket_url);

let state_for_exex = state.clone();
let state_for_canonical = state.clone();
let state_for_rpc = state.clone();
let state_for_start = state;

// Install the canon ExEx
let builder = builder.install_exex("flashblocks-canon", move |mut ctx| {
let fb = state_for_exex;
async move {
Ok(async move {
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let tip = committed.tip().num_hash();
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(tip));
}
}
Ok(())
})
}
});

// Start state processor and subscriber after node is started
// Start state processor, subscriber, and canonical subscription after node is started
let builder = builder.on_node_started(move |ctx| {
info!(message = "Starting Flashblocks state processor");
state_for_start.start(ctx.provider().clone());
subscriber.start();

let mut canonical_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
tokio::spawn(async move {
while let Some(Ok(notification)) = canonical_stream.next().await {
let committed = notification.committed();
for block in committed.blocks_iter() {
state_for_canonical.on_canonical_block_received(block.clone());
}
}
});
Comment on lines +72 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow this is nice


Ok(())
});

// Extend with RPC modules
builder.extend_rpc_modules(move |ctx| {
info!(message = "Starting Flashblocks RPC");

let fb = state_for_rpc;

let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
state_for_rpc.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register the eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb);
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), state_for_rpc);
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

Ok(())
Expand Down
132 changes: 66 additions & 66 deletions crates/client/flashblocks/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use base_client_node::{
use base_flashtypes::Flashblock;
use derive_more::Deref;
use eyre::Result;
use reth_chain_state::CanonStateSubscriptions;
use reth_optimism_chainspec::OpChainSpec;
use reth_provider::CanonStateSubscriptions;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tokio_stream::{StreamExt, wrappers::BroadcastStream};

use crate::{
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver,
Expand Down Expand Up @@ -61,7 +61,7 @@ impl FlashblocksParts {

/// Test extension for flashblocks functionality.
///
/// This extension wires up the flashblocks ExEx and RPC modules for testing,
/// This extension wires up the flashblocks canonical subscription and RPC modules for testing,
/// with optional control over canonical block processing.
#[derive(Clone, Debug)]
pub struct FlashblocksTestExtension {
Expand Down Expand Up @@ -112,77 +112,77 @@ impl BaseNodeExtension for FlashblocksTestExtension {
let receiver = self.inner.receiver.clone();
let process_canonical = self.inner.process_canonical;

let state_for_exex = state.clone();
let state_for_start = state.clone();
let state_for_rpc = state.clone();

builder
.install_exex("flashblocks-canon", move |mut ctx| {
let fb = state_for_exex.clone();
async move {
Ok(async move {
use reth_exex::ExExEvent;
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let hash = committed.tip().num_hash();
if process_canonical {
// Many suites drive canonical updates manually to reproduce race conditions, so
// allowing this to be disabled keeps canonical replay deterministic.
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
}
}
Ok(())
})
// Start state processor and subscriptions after node is started
let builder = builder.on_node_started(move |ctx| {
let provider = ctx.provider().clone();

// Start the state processor with the provider
state_for_start.start(provider.clone());

// Spawn a task to forward canonical state notifications to the in-memory state
let provider_for_notify = provider.clone();
let mut canon_notify_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
tokio::spawn(async move {
while let Some(Ok(notification)) = canon_notify_stream.next().await {
provider_for_notify
.canonical_in_memory_state()
.notify_canon_state(notification);
}
})
.extend_rpc_modules(move |ctx| {
let fb = state_for_rpc;
let provider = ctx.provider().clone();

// Start the state processor with the provider
fb.start(provider.clone());
});

let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
ctx.provider().subscribe_to_canonical_state(),
);
// If process_canonical is enabled, spawn a task to process canonical blocks
if process_canonical {
let state_for_canonical = state_for_start.clone();
let mut canonical_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
tokio::spawn(async move {
use tokio_stream::StreamExt;
while let Some(Ok(notification)) = canon_stream.next().await {
provider.canonical_in_memory_state().notify_canon_state(notification);
}
});
let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
while let Some(Ok(notification)) = canonical_stream.next().await {
let committed = notification.committed();
for block in committed.blocks_iter() {
state_for_canonical.on_canonical_block_received(block.clone());
}
}
});
}

Ok(())
});

builder.extend_rpc_modules(move |ctx| {
let fb = state_for_rpc;

let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
}
});

Ok(())
})
Ok(())
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/client/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ workspace = true
base-client-node.workspace = true

# reth
reth-exex.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true
reth-node-api.workspace = true
Expand All @@ -31,18 +30,19 @@ jsonrpsee.workspace = true

# async
tokio.workspace = true
tokio-stream.workspace = true
futures.workspace = true

# misc
lru.workspace = true
eyre.workspace = true
chrono.workspace = true
metrics.workspace = true
derive_more = { workspace = true, features = ["display"] }
tracing.workspace = true
serde.workspace = true

[dev-dependencies]
eyre.workspace = true
httpmock.workspace = true
serde_json.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
43 changes: 0 additions & 43 deletions crates/client/txpool/src/exex.rs

This file was deleted.

Loading
Loading