Skip to content

Commit 2bcf173

Browse files
[multicast] Add multicast group management to the softnpu management protocol
This extends the management message protocol with operations for multicast group lifecycle: create, destroy, add port, remove port, and list. These dispatch to the corresponding [p4rs](https://github.com/oxidecomputer/p4) Pipeline trait methods (add_mcast_group, remove_mcast_group, add_mcast_port, etc.) that were added to support bifurcated multicast replication in the p4rs codegen. Mutating operations (create, destroy, add, remove) are fire-and-forget, matching the existing TableAdd/TableRemove pattern. `MulticastGroupList` is the only round-trip operation, returning a sorted group-to-ports mapping for use by dendrite's `AsicMulticastOps` queries. The `MulticastPortAdd` struct carries rid and level1_excl_id for API parity with dendrite's AsicMulticastOps trait, though softnpu does not use them. Tofino's PRE handles per-replica identification and exclusion via these fields. softnpu handles the equivalent via `McastReplicationTag` in the codegenning. This branch depends on the related oxidecomputer/p4#240.
1 parent 52ea601 commit 2bcf173

4 files changed

Lines changed: 114 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ztest = { git = "https://github.com/oxidecomputer/falcon" }
3131
curl = "0.4.49"
3232
octocrab = "0.49"
3333
libnet = { git = "https://github.com/oxidecomputer/netadm-sys", branch = "main" }
34-
p4rs = { git = "https://github.com/oxidecomputer/p4", branch = "main" }
34+
p4rs = { git = "https://github.com/oxidecomputer/p4", branch = "zl/multicast" }
3535
sha256 = "1.6.0"
3636
camino = "1.2.2"
3737
thiserror = "2.0.18"

lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ edition = "2021"
77
p4rs.workspace = true
88
serde.workspace = true
99
serde_json.workspace = true
10+
thiserror.workspace = true
1011
tokio.workspace = true

lib/src/lib.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,31 @@ use tokio::net::UnixDatagram;
1212
// Re-export p4rs so consumers can rely on matching types
1313
pub use p4rs;
1414

15+
/// Multicast group identifier.
16+
pub type MulticastGroupId = u16;
17+
18+
/// Physical port number within a multicast group.
19+
pub type MulticastPort = u16;
20+
1521
#[derive(Debug, Default, Serialize, Deserialize)]
1622
pub enum ManagementRequest {
1723
#[default]
1824
RadixRequest,
1925
TableAdd(TableAdd),
2026
TableRemove(TableRemove),
2127
DumpRequest,
28+
MulticastGroupCreate(MulticastGroupCreate),
29+
MulticastGroupRemove(MulticastGroupRemove),
30+
MulticastPortAdd(MulticastPortAdd),
31+
MulticastPortRemove(MulticastPortRemove),
32+
MulticastGroupList,
2233
}
2334

2435
#[derive(Debug, Serialize, Deserialize)]
2536
pub enum ManagementResponse {
2637
RadixResponse(u16),
2738
DumpResponse(BTreeMap<String, Vec<TableEntry>>),
39+
MulticastGroupListResponse(BTreeMap<MulticastGroupId, Vec<MulticastPort>>),
2840
}
2941

3042
#[derive(Debug, Default, Serialize, Deserialize)]
@@ -41,6 +53,37 @@ pub struct TableRemove {
4153
pub keyset_data: Vec<u8>,
4254
}
4355

56+
#[derive(Debug, Serialize, Deserialize)]
57+
pub struct MulticastGroupCreate {
58+
pub group_id: MulticastGroupId,
59+
}
60+
61+
#[derive(Debug, Serialize, Deserialize)]
62+
pub struct MulticastGroupRemove {
63+
pub group_id: MulticastGroupId,
64+
}
65+
66+
#[derive(Debug, Serialize, Deserialize)]
67+
pub struct MulticastPortAdd {
68+
pub group_id: MulticastGroupId,
69+
pub port: MulticastPort,
70+
pub rid: u16,
71+
pub level1_excl_id: u16,
72+
}
73+
74+
#[derive(Debug, Serialize, Deserialize)]
75+
pub struct MulticastPortRemove {
76+
pub group_id: MulticastGroupId,
77+
pub port: MulticastPort,
78+
}
79+
80+
/// Errors from multicast management operations.
81+
#[derive(Debug, thiserror::Error)]
82+
pub enum MulticastError {
83+
#[error("group ID 0 is reserved (the runtime treats 0 as no-multicast)")]
84+
ReservedGroupId,
85+
}
86+
4487
pub async fn handle_management_message(
4588
msg: ManagementRequest,
4689
pipeline: &Mutex<Box<dyn Pipeline>>,
@@ -85,5 +128,72 @@ pub async fn handle_management_message(
85128
let buf = serde_json::to_vec(&response).unwrap();
86129
uds.send_to(&buf, uds_dst).await.unwrap();
87130
}
131+
// Mutating multicast ops are fire-and-forget, matching the
132+
// TableAdd/TableRemove pattern. Only MulticastGroupList returns
133+
// a response.
134+
ManagementRequest::MulticastGroupCreate(req) => {
135+
if let Err(err) = validate_group_id(req.group_id) {
136+
eprintln!("MulticastGroupCreate rejected: {err}");
137+
return;
138+
}
139+
let mut pl = pipeline.lock().unwrap();
140+
pl.add_mcast_group(req.group_id);
141+
}
142+
ManagementRequest::MulticastGroupRemove(req) => {
143+
if let Err(err) = validate_group_id(req.group_id) {
144+
eprintln!("MulticastGroupRemove rejected: {err}");
145+
return;
146+
}
147+
let mut pl = pipeline.lock().unwrap();
148+
pl.remove_mcast_group(req.group_id);
149+
}
150+
ManagementRequest::MulticastPortAdd(req) => {
151+
if let Err(err) = validate_group_id(req.group_id) {
152+
eprintln!("MulticastPortAdd rejected: {err}");
153+
return;
154+
}
155+
// rid and level1_excl_id are Tofino traffic manager concepts
156+
// for per-replica identification and exclusion. SoftNPU handles
157+
// these via McastReplicationTag in the codegen instead.
158+
//
159+
// Dendrite passes non-zero rid (set to external_group_id) as
160+
// part of the Tofino replication config, meaning its accepted but
161+
// unused.
162+
let mut pl = pipeline.lock().unwrap();
163+
pl.add_mcast_port(req.group_id, req.port);
164+
}
165+
ManagementRequest::MulticastPortRemove(req) => {
166+
if let Err(err) = validate_group_id(req.group_id) {
167+
eprintln!("MulticastPortRemove rejected: {err}");
168+
return;
169+
}
170+
let mut pl = pipeline.lock().unwrap();
171+
pl.remove_mcast_port(req.group_id, req.port);
172+
}
173+
ManagementRequest::MulticastGroupList => {
174+
let result = {
175+
let pl = pipeline.lock().unwrap();
176+
pl.get_mcast_groups()
177+
.iter()
178+
.map(|(&group_id, ports)| {
179+
let mut sorted: Vec<MulticastPort> =
180+
ports.iter().copied().collect();
181+
sorted.sort();
182+
(group_id, sorted)
183+
})
184+
.collect::<BTreeMap<MulticastGroupId, Vec<MulticastPort>>>()
185+
};
186+
let response =
187+
ManagementResponse::MulticastGroupListResponse(result);
188+
let buf = serde_json::to_vec(&response).unwrap();
189+
uds.send_to(&buf, uds_dst).await.unwrap();
190+
}
191+
}
192+
}
193+
194+
fn validate_group_id(group_id: MulticastGroupId) -> Result<(), MulticastError> {
195+
if group_id == 0 {
196+
return Err(MulticastError::ReservedGroupId);
88197
}
198+
Ok(())
89199
}

0 commit comments

Comments
 (0)