Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 2 additions & 92 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ lto = "thin"

[dependencies]
clap = { version = "4.6.1", features = ["derive"] }
tokio = { version = "1.52.3", features = ["full"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.150"
chrono = "0.4.44"
csv = "1.4.0"
metrics = "0.24.3"
Expand All @@ -19,8 +17,6 @@ isis_streaming_data_types = "0.1.1"
flatbuffers = "25.12.19"
log = "0.4.30"
env_logger = "0.11.10"
hex = {version = "0.4.3"}
itertools = "0.14.0"
clap-verbosity-flag = "3.0.4"
toml = "1.1.2"
strum = { version = "0.28.0", features = ["derive"] }
Expand Down
45 changes: 11 additions & 34 deletions benches/event_processing.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,13 @@
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use event_udp_to_kafka::WiringConfigRecord;
use event_udp_to_kafka::data_processing::process_udp_to_kafka;
use event_udp_to_kafka::data_processing::process_udp_bytes_to_kafka;
use event_udp_to_kafka::testing::make_raw_neutron_udp_header;
use flatbuffers::FlatBufferBuilder;
use std::hint::black_box;

const VALID_TIMESTAMP: u64 = (26 << (32 + 24))
+ (106 << (32 + 15))
+ (17 << (32 + 10))
+ (9 << (32 + 4))
+ (35 << 30)
+ (123 << 20)
+ (456 << 10)
+ (789);

fn make_raw_udp_message(num_events: usize) -> Vec<u8> {
// Note: 4-byte words
// Total header length: 60 bytes (15 words)
[255_u8; 4] // Header word 0: 'running' header marker
make_raw_neutron_udp_header(num_events, 123)
.iter()
.chain(&[255_u8; 4]) // Header word 1: neutron data header marker
.chain(&[0_u8; 4]) // Header word 2: information
.chain(&[0_u8; 4]) // Header word 3: frame number
.chain(&VALID_TIMESTAMP.to_be_bytes()) // Header words 4 & 5: GPS timestamp
.chain(&[0_u8; 2]) // Header word 6: period number
.chain(&[0_u8; 2]) // Header word 6: unused
.chain(&(num_events as u32).to_be_bytes()) // Header word 7: events in frame
.chain(&[0_u8; 2]) // Header word 8: ppp_in_frame
.chain(&[0_u8; 2]) // Header word 8: unused
.chain(&[0_u8; 4]) // Header word 9: vetoes
.chain(&[0_u8; 4]) // Header word 10: address of next frame
.chain(&[0_u8; 4]) // Header word 11: unknown
.chain(&[0_u8; 4]) // Header word 12: unknown
.chain(&[0_u8; 4]) // Header word 13: unknown
.chain(&[0_u8; 4]) // Header word 14: unknown
.chain(&[0_u8; 4]) // Header word 15: unknown
.chain(&vec![0_u8; num_events * 8]) // 8-byte event messages
.copied()
.collect()
Expand All @@ -42,19 +16,22 @@ fn make_raw_udp_message(num_events: usize) -> Vec<u8> {
fn benchmark_message_processing(c: &mut Criterion) {
let raw_data = make_raw_udp_message(100);
let n_bytes = raw_data.len();
let data = hex::encode(raw_data);

let mut group = c.benchmark_group("message_processing");
group.throughput(Throughput::Bytes(n_bytes as u64));

let mut fbb = FlatBufferBuilder::new();

for board_type in ["PC3877MS", "PC3544MS", "PC3634M1S"] {
for (board_type, packet_type) in [
("PC3877MS", "Position"),
("PC3544MS", "Position"),
("PC3634M1S", "DIM_OUT"),
] {
let wiring_config = vec![WiringConfigRecord {
brd_num: 0,
brd_ref: "WLSF0".to_owned(),
brd_type: board_type.to_owned(),
packet_type: "Position".to_owned(),
packet_type: packet_type.to_owned(),
sw_pos: 0,
streaming_ip: "192.168.1.1".to_owned(),
ch: 0,
Expand All @@ -68,9 +45,9 @@ fn benchmark_message_processing(c: &mut Criterion) {
&wiring_config,
|b, wiring_config| {
b.iter(|| {
process_udp_to_kafka(
process_udp_bytes_to_kafka(
&mut fbb,
black_box(&data),
black_box(&raw_data),
black_box("192.168.1.1"),
&wiring_config,
|msg| {
Expand Down
23 changes: 4 additions & 19 deletions config.example.toml
Original file line number Diff line number Diff line change
@@ -1,33 +1,18 @@
# UDP Port to bind to
port = 12345

# UDP address to bind to
host_ip = "127.0.0.1"
udp_bind_addr = "127.0.0.1:12345"

# Kafka topic to write flatbuffered messages to
dest_kafka_topic = "instrument_rawEvents"

# Kafka topic to read raw UDP (as JSON) from
src_kafka_topic = "instrument_rawJSON"

# Operation mode:
# - 0 is kafka-to-kafka
# - 1 is UDP-to-kafka
mode = 0

# Path to wiring CSV file
wiring_csv_path = "./src/config/wiring.csv"

# Address on which to bind metrics server
metrics_bind_addr = "127.0.0.1:8484"

[kafka_consumer]
"bootstrap.servers" = "itachi.isis.cclrc.ac.uk:9092"
"group.id" = "instrument_udp2kafka"
"enable.partition.eof" = "false"
"session.timeout.ms" = "6000"
"enable.auto.commit" = "true"
"auto.offset.reset" = "smallest"
# Scaling factor to use when converting raw 8-bit PPP to
# uAh per frame
raw_to_uah_scaling = 1.738e-6

[kafka_producer]
"bootstrap.servers" = "itachi.isis.cclrc.ac.uk:9092"
Expand Down
39 changes: 17 additions & 22 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
use serde::Deserialize;
use std::collections::HashMap;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Default)]
pub struct EventUdpToKafkaConfig {
/// UDP port to bind to
pub port: u32,
/// Ip address and port to bind UDP socket to
/// e.g. 192.168.1.1:12345
pub udp_bind_addr: String,

/// Ip address of the host (the IP address on which to bind a UDP port)
pub host_ip: String,
/// UDP recieve buffer size. Should be at least as large as the largest
/// single UDP datagram which will be received.
pub udp_buffer_size: Option<usize>,

/// Scaling factor to convert the 8-bit 'raw' PPP signal
/// into uAh per frame
pub raw_to_uah_scaling: Option<f64>,

/// Kafka topic to send the data to
pub dest_kafka_topic: String,

/// Kafka topic to get data from
pub src_kafka_topic: String,

/// Script operating mode
/// 0 - Consume UDP packets from Kafka SRC topic, process and send back to kafka
/// 1 - Gets UDP packets from a local socket binding, processes and kafkas
/// This script is mainly designed to function in the Kafka-> Kafka configuration
/// With the UDP->Kafka rust buffering via kafka. This gives some failover, and potential throughput options
///
mode: Option<u32>,

/// Filepath to the wiring configuration file (csv)
pub wiring_csv_path: String,

Expand All @@ -33,14 +28,14 @@ pub struct EventUdpToKafkaConfig {
/// Map of Kafka producer configuration properties. Values should be provided as strings.
/// All properties are passed through to `librdkafka`.
pub kafka_producer: HashMap<String, String>,

/// Map of Kafka consumer configuration properties. Values should be provided as strings.
/// All properties are passed through to `librdkafka`.
pub kafka_consumer: HashMap<String, String>,
}

impl EventUdpToKafkaConfig {
pub fn mode(&self) -> u32 {
self.mode.unwrap_or(0)
pub fn udp_buffer_size(&self) -> usize {
self.udp_buffer_size.unwrap_or(9000)
}

pub fn raw_to_uah_scaling(&self) -> f64 {
self.raw_to_uah_scaling.unwrap_or(1.738e-6)
}
}
Loading