Skip to content

Commit 3b6c2ac

Browse files
committed
feat(zenoh): experimental tooling
add sub & query commands, with remote conn possible by passing `--connect IP` rename orb-zenoh-rpc to orb-zenoh
1 parent c8ef452 commit 3b6c2ac

File tree

3 files changed

+160
-2
lines changed

3 files changed

+160
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

experiments/zenoh/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "orb-zenoh-rpc"
2+
name = "orb-zenoh"
33
version = "0.0.0"
44
description = "Experiments with the zenoh messaging system"
55
publish = false

experiments/zenoh/src/main.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,26 @@ enum Args {
1313
#[clap(long)]
1414
use_contiguous: bool,
1515
},
16+
Sub {
17+
#[clap(long)]
18+
key: String,
19+
#[clap(long)]
20+
connect: Option<String>,
21+
},
22+
Query {
23+
#[clap(long)]
24+
key: String,
25+
#[clap(long)]
26+
connect: Option<String>,
27+
},
28+
Pub {
29+
#[clap(long)]
30+
key: String,
31+
#[clap(long)]
32+
value: String,
33+
#[clap(long)]
34+
connect: Option<String>,
35+
},
1636
}
1737

1838
#[tokio::main]
@@ -25,11 +45,149 @@ async fn main() -> color_eyre::Result<()> {
2545
let result = match args {
2646
Args::Alice { .. } => alice(args).await,
2747
Args::Bob { .. } => bob(args).await,
48+
Args::Sub { .. } => sub(args).await,
49+
Args::Query { .. } => query(args).await,
50+
Args::Pub { .. } => publish(args).await,
2851
};
2952
telemetry.flush().await;
3053
result
3154
}
3255

56+
async fn sub(args: Args) -> color_eyre::Result<()> {
57+
let Args::Sub {
58+
key: zenoh_key,
59+
connect,
60+
} = args
61+
else {
62+
unreachable!()
63+
};
64+
65+
let connect_config = connect
66+
.map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr))
67+
.unwrap_or_default();
68+
69+
let cfg = zenoh::Config::from_json5(&format!(
70+
r#"{{
71+
mode: "peer",
72+
{connect_config}
73+
}}"#
74+
))
75+
.unwrap_or_else(|e| panic!("failed to parse config: {}", e));
76+
77+
let session = zenoh::open(cfg)
78+
.await
79+
.unwrap_or_else(|e| panic!("failed to open session: {}", e));
80+
let sub = session
81+
.declare_subscriber(&zenoh_key)
82+
.await
83+
.unwrap_or_else(|e| panic!("failed to declare subscriber: {}", e));
84+
85+
tracing::info!("Subscribed to {zenoh_key}");
86+
while let Ok(sample) = sub.recv_async().await {
87+
if let Ok(payload_str) = sample.payload().try_to_string() {
88+
tracing::info!("recv key={} payload={:?}", sample.key_expr(), payload_str);
89+
} else {
90+
tracing::info!(
91+
"recv key={} payload={:?}",
92+
sample.key_expr(),
93+
sample.payload()
94+
);
95+
}
96+
}
97+
98+
Ok(())
99+
}
100+
101+
async fn query(args: Args) -> color_eyre::Result<()> {
102+
let Args::Query {
103+
key: zenoh_key,
104+
connect,
105+
} = args
106+
else {
107+
unreachable!()
108+
};
109+
110+
let connect_config = connect
111+
.map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr))
112+
.unwrap_or_default();
113+
114+
let cfg = zenoh::Config::from_json5(&format!(
115+
r#"{{
116+
mode: "peer",
117+
{connect_config}
118+
}}"#
119+
))
120+
.unwrap_or_else(|e| panic!("failed to parse config: {}", e));
121+
122+
let session = zenoh::open(cfg)
123+
.await
124+
.unwrap_or_else(|e| panic!("failed to open session: {}", e));
125+
126+
tracing::info!("Querying key: {zenoh_key}");
127+
let replies = session
128+
.get(&zenoh_key)
129+
.await
130+
.unwrap_or_else(|e| panic!("failed to query: {}", e));
131+
132+
while let Ok(reply) = replies.recv_async().await {
133+
match reply.result() {
134+
Ok(sample) => {
135+
if let Ok(payload_str) = sample.payload().try_to_string() {
136+
println!("key={} payload={}", sample.key_expr(), payload_str);
137+
} else {
138+
println!(
139+
"key={} payload={:?}",
140+
sample.key_expr(),
141+
sample.payload()
142+
);
143+
}
144+
}
145+
Err(err) => {
146+
tracing::warn!("Query error for key {}: {:?}", zenoh_key, err);
147+
}
148+
}
149+
}
150+
151+
Ok(())
152+
}
153+
154+
async fn publish(args: Args) -> color_eyre::Result<()> {
155+
let Args::Pub {
156+
key: zenoh_key,
157+
value,
158+
connect,
159+
} = args
160+
else {
161+
unreachable!()
162+
};
163+
164+
let connect_config = connect
165+
.map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr))
166+
.unwrap_or_default();
167+
168+
let cfg = zenoh::Config::from_json5(&format!(
169+
r#"{{
170+
mode: "peer",
171+
{connect_config}
172+
}}"#
173+
))
174+
.unwrap_or_else(|e| panic!("failed to parse config: {}", e));
175+
176+
let session = zenoh::open(cfg)
177+
.await
178+
.unwrap_or_else(|e| panic!("failed to open session: {}", e));
179+
180+
tracing::info!("Publishing to key: {zenoh_key}");
181+
session
182+
.put(&zenoh_key, value.as_bytes())
183+
.await
184+
.unwrap_or_else(|e| panic!("failed to put: {}", e));
185+
186+
tracing::info!("Published value: {value}");
187+
188+
Ok(())
189+
}
190+
33191
async fn alice(args: Args) -> Result<()> {
34192
let Args::Alice { payload_size } = args else {
35193
unreachable!()

0 commit comments

Comments
 (0)