diff --git a/.gitignore b/.gitignore index c01ff99..ba96cef 100755 --- a/.gitignore +++ b/.gitignore @@ -151,6 +151,7 @@ grafana-enterprise-11.5.1.darwin-amd64.tar.gz !grafana/dashboards/styx.json !grafana/dashboards/server_specific.json !grafana/* +ycsb_dataset_* !charts/grafana/files/grafana/dashboards/server_specific.json !charts/grafana/files/grafana/dashboards/styx.json !charts/grafana/* diff --git a/coordinator/coordinator_service.py b/coordinator/coordinator_service.py index 0a72d44..87ab3a0 100755 --- a/coordinator/coordinator_service.py +++ b/coordinator/coordinator_service.py @@ -17,9 +17,10 @@ import boto3 import botocore from coordinator_metadata import Coordinator -from prometheus_client import Gauge, start_http_server +from prometheus_client import Counter, Gauge, start_http_server from styx.common.logging import logging from styx.common.message_types import MessageType +from styx.common.metrics import WorkerEpochStats from styx.common.protocols import Protocols from styx.common.serialization import Serializer from styx.common.tcp_networking import MessagingMode, NetworkingManager @@ -119,6 +120,7 @@ def __init__(self) -> None: self.recovery_state: RecoveryState = RecoveryState.IDLE self.metrics_server = start_http_server(8000) + self.live_worker_count_gauge = Gauge("live_worker_count", "Number of live workers registered with coordinator") self.cpu_usage_gauge = Gauge( "worker_cpu_usage_percent", "CPU usage percentage", @@ -170,6 +172,77 @@ def __init__(self) -> None: ["instance"], ) + self.queue_backlog_gauge = Gauge("queue_backlog", "Backlog in the worker queue", ["instance"]) + + self.input_rate_counter = Counter("input_rate_counter", "Input rate", ["instance"]) + # Transaction count metrics + self.epoch_total_txns_counter = Counter( + "epoch_total_transactions", "Total transactions processed (cumulative)", ["instance"] + ) + self.epoch_committed_txns_counter = Counter( + "epoch_committed_transactions", "Committed transactions (cumulative)", ["instance"] + ) + self.epoch_logic_aborts_counter = Counter( + "epoch_logic_aborts", "Logic/global aborts (cumulative)", ["instance"] + ) + self.epoch_concurrency_aborts_counter = Counter( + "epoch_concurrency_aborts", "Concurrency aborts (cumulative)", ["instance"] + ) + self.epoch_committed_lock_free_counter = Counter( + "epoch_committed_lock_free", "Transactions committed in lock-free phase (cumulative)", ["instance"] + ) + self.epoch_committed_fallback_counter = Counter( + "epoch_committed_fallback", "Transactions committed in fallback phase (cumulative)", ["instance"] + ) + self.cpu_utilization_ratio_gauge = Gauge( + "worker_cpu_utilization", "Ratio of CPU work in the epoch", ["instance"] + ) + self.io_utilization_ratio_gauge = Gauge( + "worker_io_utilization", "Ratio of IO wait time in the epoch", ["instance"] + ) + # Operator-level performance metrics + self.operator_tps_counter = Counter( + "operator_tps", + "Transactions per second per operator partition (cumulative)", + ["instance", "operator", "partition"], + ) + self.operator_call_count_counter = Counter( + "operator_call_count", + "Number of calls to an operator partition (cumulative)", + ["instance", "operator", "partition"], + ) + self.operator_latency_gauge = Gauge( + "operator_latency_ms", + "Average operator call latency in ms for this epoch", + ["instance", "operator", "partition"], + ) + + # Used for annotations in the grafana dashboard + self.migration_start_time_gauge = Gauge("migration_start_time_ms", "Timestamp when the migration started", []) + self.migration_end_time_gauge = Gauge("migration_end_time_ms", "Timestamp when the migration completed", []) + + # Phase-attributed resource metrics (aggregated per epoch in the worker, scraped at coordinator). + self.phase_cpu_ms_total = Counter( + "phase_cpu_ms_total", + "Process CPU time attributed to a transactional protocol phase (ms, cumulative)", + ["instance", "phase"], + ) + self.phase_net_rx_bytes_total = Counter( + "phase_net_rx_bytes_total", + "Network RX bytes attributed to a transactional protocol phase (bytes, cumulative)", + ["instance", "phase"], + ) + self.phase_net_tx_bytes_total = Counter( + "phase_net_tx_bytes_total", + "Network TX bytes attributed to a transactional protocol phase (bytes, cumulative)", + ["instance", "phase"], + ) + self.phase_rss_max_mb = Gauge( + "phase_rss_max_mb", + "Max RSS observed during a transactional protocol phase within the last reported epoch (MB)", + ["instance", "phase"], + ) + self.migration_in_progress: bool = False self.networking_locks: dict[MessageType, asyncio.Lock] = { @@ -284,6 +357,8 @@ async def _start_migration(self, graph: StateflowGraph) -> None: await self.stop_snapshotting() logging.warning(f"MIGRATION | START {graph}") + start_time = time.time_ns() + self.migration_start_time_gauge.set(start_time / 1_000_000) await self.coordinator.update_stateflow_graph(graph) n_workers = len(self.coordinator.worker_pool.get_participating_workers()) @@ -377,6 +452,7 @@ async def _handle_register_worker( logging.warning( f"Worker registered {worker_ip}:{worker_port} with id {worker_id}", ) + self.live_worker_count_gauge.set(len(self.coordinator.worker_pool.get_live_workers())) async def _track_reregistered_worker(self, worker_id: int) -> None: async with self.recovery_lock: @@ -565,9 +641,21 @@ async def _handle_sync_cleanup(self, data: bytes) -> None: commit_time, fallback_time, snap_time, + input_rate, + queue_backlog, + total_txns, + committed_txns, + logic_aborts, + concurrency_aborts, + committed_lock_free, + committed_fallback, + cpu_utilization, + io_wait_utilization, + operator_epoch_stats, + phase_resources, ) = self.protocol_networking.decode_message(data) - self._record_epoch_metrics( + worker_epoch_stats = WorkerEpochStats( worker_id=worker_id, epoch_throughput=epoch_throughput, epoch_latency=epoch_latency, @@ -580,6 +668,22 @@ async def _handle_sync_cleanup(self, data: bytes) -> None: commit_time=commit_time, fallback_time=fallback_time, snap_time=snap_time, + input_rate=input_rate, + queue_backlog=queue_backlog, + total_txns=total_txns, + committed_txns=committed_txns, + logic_aborts=logic_aborts, + concurrency_aborts=concurrency_aborts, + committed_lock_free=committed_lock_free, + committed_fallback=committed_fallback, + cpu_utilization=cpu_utilization, + io_wait_utilization=io_wait_utilization, + operator_epoch_stats=operator_epoch_stats, + phase_resources=phase_resources, + ) + + self._record_epoch_metrics( + worker_epoch_stats, ) sync_complete: bool = self.aria_metadata.set_empty_sync_done(worker_id) @@ -595,32 +699,66 @@ async def _handle_sync_cleanup(self, data: bytes) -> None: def _record_epoch_metrics( self, - *, - worker_id: str, - epoch_throughput: float, - epoch_latency: float, - local_abort_rate: float, - wal_time: float, - func_time: float, - chain_ack_time: float, - sync_time: float, - conflict_res_time: float, - commit_time: float, - fallback_time: float, - snap_time: float, + worker_epoch_stats: WorkerEpochStats, ) -> None: - self.epoch_throughput_gauge.labels(instance=worker_id).set(epoch_throughput) - self.epoch_latency_gauge.labels(instance=worker_id).set(epoch_latency) - self.epoch_abort_gauge.labels(instance=worker_id).set(local_abort_rate) - - self.latency_breakdown_gauge.labels(instance=worker_id, component="WAL").set(wal_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="1st Run").set(func_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="Chain Acks").set(chain_ack_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="SYNC").set(sync_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="Conflict Resolution").set(conflict_res_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="Commit time").set(commit_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="Fallback").set(fallback_time) - self.latency_breakdown_gauge.labels(instance=worker_id, component="Async Snapshot").set(snap_time) + worker_id = worker_epoch_stats.worker_id + + self.epoch_throughput_gauge.labels(instance=worker_id).set(worker_epoch_stats.epoch_throughput) + self.epoch_latency_gauge.labels(instance=worker_id).set(worker_epoch_stats.epoch_latency) + self.epoch_abort_gauge.labels(instance=worker_id).set(worker_epoch_stats.local_abort_rate) + + self.latency_breakdown_gauge.labels(instance=worker_id, component="WAL").set(worker_epoch_stats.wal_time) + self.latency_breakdown_gauge.labels(instance=worker_id, component="1st Run").set(worker_epoch_stats.func_time) + self.latency_breakdown_gauge.labels(instance=worker_id, component="Chain Acks").set( + worker_epoch_stats.chain_ack_time + ) + self.latency_breakdown_gauge.labels(instance=worker_id, component="SYNC").set(worker_epoch_stats.sync_time) + self.latency_breakdown_gauge.labels(instance=worker_id, component="Conflict Resolution").set( + worker_epoch_stats.conflict_res_time + ) + self.latency_breakdown_gauge.labels(instance=worker_id, component="Commit time").set( + worker_epoch_stats.commit_time + ) + self.latency_breakdown_gauge.labels(instance=worker_id, component="Fallback").set( + worker_epoch_stats.fallback_time + ) + self.latency_breakdown_gauge.labels(instance=worker_id, component="Async Snapshot").set( + worker_epoch_stats.snap_time + ) + + self.input_rate_counter.labels(instance=worker_id).inc(worker_epoch_stats.input_rate) + self.queue_backlog_gauge.labels(instance=worker_id).set(worker_epoch_stats.queue_backlog) + + # Transaction count metrics + self.epoch_total_txns_counter.labels(instance=worker_id).inc(worker_epoch_stats.total_txns) + self.epoch_committed_txns_counter.labels(instance=worker_id).inc(worker_epoch_stats.committed_txns) + self.epoch_logic_aborts_counter.labels(instance=worker_id).inc(worker_epoch_stats.logic_aborts) + self.epoch_concurrency_aborts_counter.labels(instance=worker_id).inc(worker_epoch_stats.concurrency_aborts) + self.epoch_committed_lock_free_counter.labels(instance=worker_id).inc(worker_epoch_stats.committed_lock_free) + self.epoch_committed_fallback_counter.labels(instance=worker_id).inc(worker_epoch_stats.committed_fallback) + + self.cpu_utilization_ratio_gauge.labels(instance=worker_id).set(worker_epoch_stats.cpu_utilization) + self.io_utilization_ratio_gauge.labels(instance=worker_id).set(worker_epoch_stats.io_wait_utilization) + + # Operator-level metrics for this worker and epoch + for op_name, partition, tps, avg_latency_ms, call_count in worker_epoch_stats.operator_epoch_stats: + labels = {"instance": worker_id, "operator": op_name, "partition": str(partition)} + self.operator_tps_counter.labels(**labels).inc(tps) + self.operator_call_count_counter.labels(**labels).inc(call_count) + self.operator_latency_gauge.labels(**labels).set(avg_latency_ms) + + for phase, v in worker_epoch_stats.phase_resources.get("cpu_ns", {}).items(): + self.phase_cpu_ms_total.labels(instance=worker_id, phase=phase).inc(float(v) / 1e6) + logging.warning(f" Phase {phase} CPU: {float(v) / 1e6} ms") + for phase, v in worker_epoch_stats.phase_resources.get("rx_bytes", {}).items(): + self.phase_net_rx_bytes_total.labels(instance=worker_id, phase=phase).inc(float(v)) + logging.warning(f" Phase {phase} RX: {float(v)} bytes") + for phase, v in worker_epoch_stats.phase_resources.get("tx_bytes", {}).items(): + self.phase_net_tx_bytes_total.labels(instance=worker_id, phase=phase).inc(float(v)) + logging.warning(f" Phase {phase} TX: {float(v)} bytes") + for phase, v in worker_epoch_stats.phase_resources.get("rss_max_bytes", {}).items(): + self.phase_rss_max_mb.labels(instance=worker_id, phase=phase).set(float(v) / (1024 * 1024)) + logging.warning(f" Phase {phase} RSS Max: {float(v) / (1024 * 1024)} MB") async def _handle_deterministic_reordering(self, data: bytes) -> None: mt = MessageType.DeterministicReordering @@ -662,8 +800,10 @@ async def _handle_migration_done(self, _: bytes) -> None: if not sync_complete: return + end_time = time.time_ns() + self.migration_end_time_gauge.set(end_time / 1_000_000) logging.warning( - f"MIGRATION_FINISHED at time: {time.time_ns() // 1_000_000}", + f"MIGRATION_FINISHED at time: {end_time // 1_000_000}", ) await self.migration_metadata.cleanup(mt) self.migration_in_progress = False diff --git a/coordinator/worker_pool.py b/coordinator/worker_pool.py index 5351f74..28da100 100644 --- a/coordinator/worker_pool.py +++ b/coordinator/worker_pool.py @@ -55,6 +55,15 @@ def __init__(self) -> None: self.operator_partition_to_worker: dict[OperatorPartition, int] = {} self.orphaned_operator_assignments: dict[OperatorPartition, Operator] = {} + def get_live_workers(self) -> list[Worker]: + """Returns all currently known (non-tombstoned) workers""" + live: list[Worker] = [] + for _, _, worker in self._queue: + if worker == self._tombstone: + continue + live.append(worker) + return live + def register_worker( self, worker_ip: str, diff --git a/demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py b/demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py index 590202a..4d1507f 100755 --- a/demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py +++ b/demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py @@ -28,6 +28,9 @@ from styx.common.local_state_backends import LocalStateBackend from styx.common.stateflow_graph import StateflowGraph +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from load_generator import LoadSchedule + SAVE_DIR: str = sys.argv[1] threads = int(sys.argv[2]) barrier = multiprocessing.Barrier(threads) @@ -40,7 +43,8 @@ STYX_HOST: str = os.getenv("STYX_HOST", "localhost") STYX_PORT: int = int(os.getenv("STYX_PORT", "8886")) KAFKA_URL: str = os.getenv("KAFKA_URL", "localhost:9092") -kill_at = int(sys.argv[7]) if len(sys.argv) > 7 else -1 +load_config_path = sys.argv[7] +kill_at = int(sys.argv[8]) if len(sys.argv) > 8 else -1 g = StateflowGraph("deathstar_hotel_reservations", operator_state_backend=LocalStateBackend.DICT, @@ -65,6 +69,12 @@ user_operator, ) +load_schedule = LoadSchedule.from_config_file( + load_config_path, + target_tps=messages_per_second, + time=seconds +) + def styx_hash(styx: SyncStyxClient, key, op) -> int: # IMPORTANT: op is the Operator object; Styx knows how to map it. @@ -284,8 +294,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: subprocess.run(["docker", "kill", "styx-worker-1"], check=False) print("KILL -> styx-worker-1 done") sec_start = timer() - for i in range(messages_per_second): - if i % (messages_per_second // sleeps_per_second) == 0: + current_tps = load_schedule.get_tps(second) + for i in range(current_tps): + if i % (current_tps // sleeps_per_second) == 0: time.sleep(sleep_time) operator, key, func_name, params = next(deathstar_generator) future = styx.send_event( @@ -301,7 +312,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: if lps < 1: time.sleep(1 - lps) sec_end2 = timer() - print(f"Latency per second: {sec_end2 - sec_start}") + print(f"{second} | TPS: {current_tps} | Latency: {sec_end2 - sec_start:.3f}s") end = timer() print(f"Average latency per second: {(end - start) / seconds}") styx.close() diff --git a/demo/demo-deathstar-movie-review/pure_kafka_demo.py b/demo/demo-deathstar-movie-review/pure_kafka_demo.py index 506fca1..58017f5 100644 --- a/demo/demo-deathstar-movie-review/pure_kafka_demo.py +++ b/demo/demo-deathstar-movie-review/pure_kafka_demo.py @@ -31,6 +31,9 @@ from styx.client import SyncStyxClient from styx.common.local_state_backends import LocalStateBackend from styx.common.stateflow_graph import StateflowGraph + +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from load_generator import LoadSchedule from workload_data import charset, movie_titles SAVE_DIR: str = sys.argv[1] @@ -45,7 +48,8 @@ STYX_HOST: str = os.getenv("STYX_HOST", "localhost") STYX_PORT: int = int(os.getenv("STYX_PORT", "8886")) KAFKA_URL: str = os.getenv("KAFKA_URL", "localhost:9092") -kill_at = int(sys.argv[7]) if len(sys.argv) > 7 else -1 +load_config_path: str = sys.argv[7] +kill_at = int(sys.argv[8]) if len(sys.argv) > 8 else -1 g = StateflowGraph("deathstar_movie_review", operator_state_backend=LocalStateBackend.DICT, @@ -72,6 +76,12 @@ frontend_operator ) +load_schedule = LoadSchedule.from_config_file( + load_config_path, + target_tps=messages_per_second, + time=seconds +) + # ------------------------------------------------------------------------------------- # init_data helpers @@ -211,8 +221,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: subprocess.run(["docker", "kill", "styx-worker-1"], check=False) print("KILL -> styx-worker-1 done") sec_start = timer() - for i in range(messages_per_second): - if i % (messages_per_second // sleeps_per_second) == 0: + current_tps = load_schedule.get_tps(second) + for i in range(current_tps): + if i % (current_tps // sleeps_per_second) == 0: time.sleep(sleep_time) operator, key, func_name, params = next(deathstar_generator) future = styx.send_event(operator=operator, @@ -226,7 +237,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: if lps < 1: time.sleep(1 - lps) sec_end2 = timer() - print(f"Latency per second: {sec_end2 - sec_start}") + print(f"{second} | TPS: {current_tps} | Latency: {sec_end2 - sec_start:.3f}s") end = timer() print(f"Average latency per second: {(end - start) / seconds}") styx.close() diff --git a/demo/demo-migration-tpc-c/pure_kafka_demo.py b/demo/demo-migration-tpc-c/pure_kafka_demo.py index e29020e..5be2e10 100644 --- a/demo/demo-migration-tpc-c/pure_kafka_demo.py +++ b/demo/demo-migration-tpc-c/pure_kafka_demo.py @@ -35,6 +35,8 @@ from styx.common.local_state_backends import LocalStateBackend from styx.common.stateflow_graph import StateflowGraph from tqdm import tqdm +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from load_generator import LoadSchedule SAVE_DIR: str = sys.argv[1] threads = int(sys.argv[2]) @@ -65,6 +67,8 @@ enable_compression: bool = bool(strtobool(sys.argv[9])) use_composite_keys: bool = bool(strtobool(sys.argv[10])) use_fallback_cache: bool = bool(strtobool(sys.argv[11])) +load_config_path: str = sys.argv[12] + os.environ["ENABLE_COMPRESSION"] = str(enable_compression) os.environ["USE_COMPOSITE_KEYS"] = str(use_composite_keys) os.environ["USE_FALLBACK_CACHE"] = str(use_fallback_cache) @@ -97,7 +101,11 @@ order_operator, order_line_operator, stock_operator, warehouse_operator, new_order_txn_operator, customer_idx_operator, payment_txn_operator) - +load_schedule = LoadSchedule.from_config_file( + load_config_path, + target_tps=messages_per_second, + time=seconds +) def populate_warehouse(styx: SyncStyxClient): with open(os.path.join(script_path, "data/warehouse.csv")) as f: @@ -501,8 +509,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: start = timer() for cur_sec in range(seconds): sec_start = timer() - for i in range(messages_per_second): - if i % (messages_per_second // sleeps_per_second) == 0: + current_tps = load_schedule.get_tps(cur_sec) + for i in range(current_tps): + if i % (current_tps // sleeps_per_second) == 0: time.sleep(sleep_time) operator, key, func_name, params = next(tpc_c_generator) future = styx.send_event(operator=operator, @@ -516,7 +525,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: if lps < 1: time.sleep(1 - lps) sec_end2 = timer() - print(f"Latency per second: {sec_end2 - sec_start}") + print(f"{cur_sec} | TPS: {current_tps} | Latency: {sec_end2 - sec_start:.3f}s") if cur_sec == SECOND_TO_TAKE_MIGRATION: new_g = StateflowGraph("tpcc_benchmark", operator_state_backend=LocalStateBackend.DICT) #################################################################################################################### diff --git a/demo/demo-migration-ycsb/client.py b/demo/demo-migration-ycsb/client.py index 1937b08..a020f8d 100644 --- a/demo/demo-migration-ycsb/client.py +++ b/demo/demo-migration-ycsb/client.py @@ -23,6 +23,8 @@ from styx.common.stateflow_graph import StateflowGraph from tqdm import tqdm from ycsb import ycsb_operator +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from load_generator import LoadSchedule def ycsb_field(size=100, seed=0): @@ -43,6 +45,7 @@ def ycsb_field(size=100, seed=0): seconds = int(sys.argv[5]) SAVE_DIR: str = sys.argv[6] warmup_seconds: int = int(sys.argv[7]) +load_config_path: str = sys.argv[9] BATCH_SIZE = 100_000 @@ -58,6 +61,12 @@ def ycsb_field(size=100, seed=0): ycsb_operator.set_n_partitions(START_N_PARTITIONS) g.add_operators(ycsb_operator) +load_schedule = LoadSchedule.from_config_file( + load_config_path, + target_tps=messages_per_second, + time=seconds +) + def submit_graph(styx: SyncStyxClient): print(f"Partitions: {list(g.nodes.values())[0].n_partitions}") styx.submit_dataflow(g) @@ -133,8 +142,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: start = timer() for cur_sec in range(seconds): sec_start = timer() - for i in range(messages_per_second): - if i % (messages_per_second // sleeps_per_second) == 0: + current_tps = load_schedule.get_tps(cur_sec) + for i in range(current_tps): + if i % (current_tps // sleeps_per_second) == 0: time.sleep(sleep_time) operator, key, func_name = next(ycsb_generator) future = styx.send_event(operator=operator, @@ -147,7 +157,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: if lps < 1: time.sleep(1 - lps) sec_end2 = timer() - print(f"Latency per second: {sec_end2 - sec_start}") + print(f"{cur_sec} | TPS: {current_tps} | Latency: {sec_end2 - sec_start:.3f}s") if cur_sec == SECOND_TO_TAKE_MIGRATION and proc_num == 0: new_g = StateflowGraph("ycsb-benchmark", operator_state_backend=LocalStateBackend.DICT) ycsb_operator.set_n_partitions(END_N_PARTITIONS) @@ -187,7 +197,6 @@ def main(): results = p.map(benchmark_runner, range(threads)) results = {k: v for d in results for k, v in d.items()} - assert len(results) == messages_per_second * seconds * threads pd.DataFrame({"request_id": list(results.keys()), "timestamp": [res["timestamp"] for res in results.values()], diff --git a/demo/demo-tpc-c/pure_kafka_demo.py b/demo/demo-tpc-c/pure_kafka_demo.py index 21ccec3..d5f783f 100644 --- a/demo/demo-tpc-c/pure_kafka_demo.py +++ b/demo/demo-tpc-c/pure_kafka_demo.py @@ -38,6 +38,9 @@ from styx.common.local_state_backends import LocalStateBackend from styx.common.stateflow_graph import StateflowGraph +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from load_generator import LoadSchedule + random.seed(42) SAVE_DIR: str = sys.argv[1] @@ -69,7 +72,8 @@ os.environ["ENABLE_COMPRESSION"] = str(enable_compression) os.environ["USE_COMPOSITE_KEYS"] = str(use_composite_keys) os.environ["USE_FALLBACK_CACHE"] = str(use_fallback_cache) -kill_at = int(sys.argv[11]) if len(sys.argv) > 11 else -1 +load_config_path = sys.argv[11] +kill_at = int(sys.argv[12]) if len(sys.argv) > 12 else -1 customers_per_district: dict[tuple, list] = {} @@ -104,7 +108,11 @@ order_operator, order_line_operator, stock_operator, warehouse_operator, new_order_txn_operator, customer_idx_operator, payment_txn_operator) - +load_schedule = LoadSchedule.from_config_file( + load_config_path, + target_tps=messages_per_second, + time=seconds +) # ------------------------------------------------------------------------------------- # Cache helper @@ -419,8 +427,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: subprocess.run(["docker", "kill", "styx-worker-1"], check=False) print("KILL -> styx-worker-1 done") sec_start = timer() - for i in range(messages_per_second): - if i % (messages_per_second // sleeps_per_second) == 0: + current_tps = load_schedule.get_tps(second) + for i in range(current_tps): + if i % (current_tps // sleeps_per_second) == 0: time.sleep(sleep_time) operator, key, func_name, params = next(tpc_c_generator) future = styx.send_event(operator=operator, @@ -434,7 +443,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: if lps < 1: time.sleep(1 - lps) sec_end2 = timer() - print(f"Latency per second: {sec_end2 - sec_start}") + print(f"{second} | TPS: {current_tps} | Latency per second: {sec_end2 - sec_start}") end = timer() print(f"Average latency per second: {(end - start) / seconds}") styx.close() diff --git a/demo/demo-ycsb/client.py b/demo/demo-ycsb/client.py index d3a44f5..180e3de 100644 --- a/demo/demo-ycsb/client.py +++ b/demo/demo-ycsb/client.py @@ -17,6 +17,9 @@ from styx.common.local_state_backends import LocalStateBackend from styx.common.operator import Operator from styx.common.stateflow_graph import StateflowGraph + +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from load_generator import LoadSchedule from tqdm import tqdm from ycsb import ycsb_operator from zipfian_generator import ZipfGenerator @@ -42,7 +45,8 @@ SAVE_DIR: str = sys.argv[7] warmup_seconds: int = int(sys.argv[8]) run_with_validation = sys.argv[9].lower() == "true" -kill_at: int = int(sys.argv[10]) +load_config_path: str = sys.argv[10] +kill_at: int = int(sys.argv[11]) if len(sys.argv) > 11 else -1 #################################################################################################################### g = StateflowGraph("ycsb-benchmark", operator_state_backend=LocalStateBackend.DICT, @@ -50,6 +54,13 @@ ycsb_operator.set_n_partitions(N_PARTITIONS) g.add_operators(ycsb_operator) +load_schedule = LoadSchedule.from_config_file( + load_config_path, + target_tps=messages_per_second, + time=seconds +) + + def submit_graph(styx: SyncStyxClient): print(f"Partitions: {list(g.nodes.values())[0].n_partitions}") styx.submit_dataflow(g) @@ -107,8 +118,10 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: subprocess.run(["docker", "kill", "styx-worker-1"], check=False) print("KILL -> styx-worker-1 done") sec_start = timer() - step = max(1, messages_per_second // sleeps_per_second) - for i in range(messages_per_second): + + current_tps = load_schedule.get_tps(second) + step = max(1, current_tps // sleeps_per_second) + for i in range(current_tps): if i % step == 0: time.sleep(sleep_time) operator, key, func_name, params = next(ycsb_generator) @@ -123,7 +136,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: if lps < 1: time.sleep(1 - lps) sec_end2 = timer() - print(f"{second} | Latency per second: {sec_end2 - sec_start}") + print(f"{second} | TPS: {current_tps} | Latency per second: {sec_end2 - sec_start}") end = timer() print(f"Average latency per second: {(end - start) / seconds}") @@ -157,7 +170,6 @@ def main(): results = p.map(benchmark_runner, range(threads)) results = {k: v for d in results for k, v in d.items()} - assert len(results) == messages_per_second * seconds * threads if run_with_validation: # wait for system to stabilize diff --git a/demo/load_generator.py b/demo/load_generator.py new file mode 100644 index 0000000..968bdf7 --- /dev/null +++ b/demo/load_generator.py @@ -0,0 +1,192 @@ +import random +import math +from abc import ABC, abstractmethod + + +class LoadGenerator(ABC): + def __init__(self, time: int, max_threshold: int | None = None): + self.time = time + self.max_threshold = max_threshold + + @abstractmethod + def generate(self) -> list[int]: + pass + + +class RandomLoadGenerator(LoadGenerator): + def __init__(self, target_tps: int, time: int, magnitude: int, max_threshold: int | None = None): + super().__init__(time, max_threshold) + self.target_tps = target_tps + self.magnitude = magnitude + + def generate(self) -> list[int]: + values = [] + val = self.target_tps + for i in range(0, self.time): + val += random.randrange(-self.magnitude, self.magnitude) + if self.max_threshold is not None and val > self.max_threshold: + val = self.max_threshold + values.append(val) + return [abs(int(val)) for val in values] + + +class IncreaseLoadGenerator(LoadGenerator): + def __init__(self, target_tps: int, time: int, magnitude: int, max_threshold: int | None = None): + super().__init__(time, max_threshold) + self.target_tps = target_tps + self.magnitude = magnitude + + def generate(self) -> list[int]: + values = [] + val = self.target_tps + for i in range(0, self.time): + val += random.randrange(int(-self.magnitude * (1 / 30)), int(self.magnitude * (1 / 22))) + if self.max_threshold is not None and val > self.max_threshold: + val = self.max_threshold + values.append(val) + return [abs(int(val)) for val in values] + + +class DecreaseLoadGenerator(LoadGenerator): + def __init__(self, target_tps: int, time: int, magnitude: int, max_threshold: int | None = None): + super().__init__(time, max_threshold) + self.target_tps = target_tps + self.magnitude = magnitude + + def generate(self) -> list[int]: + values = [] + val = self.target_tps + for i in range(0, self.time): + val += random.randrange(int(-self.magnitude * (1 / 21)), int(self.magnitude * (1 / 28))) + if self.max_threshold is not None and val > self.max_threshold: + val = self.max_threshold + values.append(val) + return [abs(int(val)) for val in values] + + +class CosineLoadGenerator(LoadGenerator): + def __init__(self, target_tps: int, time: int, cosine_period: int, mean_input_rate: int, + max_divergence: int, max_noise: int, max_threshold: int | None = None): + super().__init__(time, max_threshold) + self.target_tps = target_tps + self.cosine_period = cosine_period + self.mean_input_rate = mean_input_rate + self.max_divergence = max_divergence + self.max_noise = max_noise + + def generate(self) -> list[int]: + values = [] + for i in range(0, self.time): + period = (2 * math.pi / self.cosine_period) + value = self.mean_input_rate + self.max_divergence * math.cos(period * i + math.pi) + value += random.random() * (2 * self.max_noise) - self.max_noise + if self.max_threshold is not None and value > self.max_threshold: + value = self.max_threshold + values.append(value) + return [abs(int(val)) for val in values] + + +class StepPatternLoadGenerator(LoadGenerator): + def __init__(self, target_tps: int, time: int, initial_round_length: int, regular_round_length: int, + round_rates: list[int], max_noise: int = 0, max_threshold: int | None = None): + super().__init__(time, max_threshold) + self.target_tps = target_tps + self.initial_round_length = initial_round_length + self.regular_round_length = regular_round_length + self.round_rates = round_rates + self.max_noise = max_noise + + def generate(self) -> list[int]: + values = [] + for i in range(0, self.time): + round_index = 0 + if i > self.initial_round_length: + round_index = 1 + (i - self.initial_round_length) // self.regular_round_length + + value = self.round_rates[round_index] if len(self.round_rates) > round_index else 0 + value += random.random() * (2 * self.max_noise) - self.max_noise + if self.max_threshold is not None and value > self.max_threshold: + value = self.max_threshold + values.append(max(0, value)) + return [abs(int(val)) for val in values] + + +class ConstantLoadGenerator(LoadGenerator): + def __init__(self, target_tps: int, time: int, max_threshold: int | None = None): + super().__init__(time, max_threshold) + self.target_tps = target_tps + + def generate(self) -> list[int]: + return [self.target_tps] * self.time + + +class LoadSchedule: + + GENERATORS: dict[str, type[LoadGenerator]] = { + "constant": ConstantLoadGenerator, + "increasing": IncreaseLoadGenerator, + "decreasing": DecreaseLoadGenerator, + "cosine": CosineLoadGenerator, + "step": StepPatternLoadGenerator, + "random": RandomLoadGenerator, + } + + def __init__(self, values: list[int], step_size: int): + """ + Args: + values: List of TPS values from a LoadGenerator + step_size: Duration in seconds for each TPS value + """ + self.values = values + self.step_size = step_size + + @classmethod + def from_generator(cls, generator_name: str, step_size: int, **kwargs) -> "LoadSchedule": + """ + Create a LoadSchedule from a generator name and parameters. + + Example: + LoadSchedule.from_generator("cosine", step_size=15, + target_tps=1000, time=20, cosine_period=10, + mean_input_rate=1000, max_divergence=500, max_noise=50) + """ + if generator_name not in cls.GENERATORS: + available = ", ".join(cls.GENERATORS.keys()) + raise ValueError(f"Unknown generator '{generator_name}'. Available: {available}") + generator = cls.GENERATORS[generator_name](**kwargs) + values = generator.generate() + return cls(values, step_size) + + @classmethod + def from_config_file(cls, config_path: str, target_tps: int, time: int) -> "LoadSchedule": + """Load schedule from a YAML/JSON config file.""" + import yaml + with open(config_path) as f: + config = yaml.safe_load(f) + + profile = config.pop("profile") + step_size = config.pop("step_size", 5) + + # Inject runtime values + config["target_tps"] = target_tps + config["time"] = time + + return cls.from_generator(profile, step_size, **config) + + def get_tps(self, elapsed_seconds: float) -> int: + index = int(elapsed_seconds // self.step_size) + if index < 0: + return self.values[0] + if index >= len(self.values): + return self.values[-1] + return self.values[index] + + def total_duration(self) -> int: + return len(self.values) * self.step_size + + def __iter__(self): + for i, tps in enumerate(self.values): + yield i * self.step_size, tps + + def __len__(self): + return len(self.values) diff --git a/demo/load_profiles/constant.yaml b/demo/load_profiles/constant.yaml new file mode 100644 index 0000000..a4ca1cb --- /dev/null +++ b/demo/load_profiles/constant.yaml @@ -0,0 +1,2 @@ +profile: constant +step_size: 5 \ No newline at end of file diff --git a/demo/load_profiles/cosine.yaml b/demo/load_profiles/cosine.yaml new file mode 100644 index 0000000..650ee93 --- /dev/null +++ b/demo/load_profiles/cosine.yaml @@ -0,0 +1,6 @@ +profile: cosine +step_size: 5 +cosine_period: 30 +mean_input_rate: 5000 +max_divergence: 1500 +max_noise: 50 \ No newline at end of file diff --git a/demo/load_profiles/decreasing.yaml b/demo/load_profiles/decreasing.yaml new file mode 100644 index 0000000..7e0c29c --- /dev/null +++ b/demo/load_profiles/decreasing.yaml @@ -0,0 +1,3 @@ +profile: decreasing +step_size: 5 +magnitude: 10000 \ No newline at end of file diff --git a/demo/load_profiles/increasing.yaml b/demo/load_profiles/increasing.yaml new file mode 100644 index 0000000..b821e74 --- /dev/null +++ b/demo/load_profiles/increasing.yaml @@ -0,0 +1,3 @@ +profile: increasing +step_size: 5 +magnitude: 10000 \ No newline at end of file diff --git a/demo/load_profiles/random.yaml b/demo/load_profiles/random.yaml new file mode 100644 index 0000000..5e055c2 --- /dev/null +++ b/demo/load_profiles/random.yaml @@ -0,0 +1,3 @@ +profile: random +step_size: 5 +magnitude: 5000 \ No newline at end of file diff --git a/demo/load_profiles/step.yaml b/demo/load_profiles/step.yaml new file mode 100644 index 0000000..b92b3e1 --- /dev/null +++ b/demo/load_profiles/step.yaml @@ -0,0 +1,5 @@ +profile: step +step_size: 5 +initial_round_length: 40 +regular_round_length: 60 +round_rates: [1000, 2000, 7000, 4000] \ No newline at end of file diff --git a/grafana/dashboards/styx.json b/grafana/dashboards/styx.json index dc27731..33337cf 100644 --- a/grafana/dashboards/styx.json +++ b/grafana/dashboards/styx.json @@ -1,752 +1,2216 @@ { - "annotations": { - "list": [ - { - "builtIn": 1, - "datasource": { - "type": "grafana", - "uid": "-- Grafana --" - }, - "enable": true, - "hide": true, - "iconColor": "rgba(0, 211, 255, 1)", - "name": "Annotations & Alerts", - "type": "dashboard" - } - ] - }, - "editable": true, - "fiscalYearStartMonth": 0, - "graphTooltip": 0, - "id": 9, - "links": [ - { - "asDropdown": false, - "icon": "external link", - "includeVars": false, - "keepTime": false, - "tags": [], - "targetBlank": false, - "title": "Worker Specific", - "tooltip": "", - "type": "dashboards", - "url": "" - } - ], - "panels": [ + "annotations": { + "list": [ { + "builtIn": 1, "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "type": "grafana", + "uid": "-- Grafana --" }, - "fieldConfig": { - "defaults": { - "max": 100, - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "red", - "value": null - }, - { - "color": "orange", - "value": 75 - }, - { - "color": "green", - "value": 100 - } - ] - }, - "unit": "percent" - }, - "overrides": [] - }, - "gridPos": { - "h": 4, - "w": 6, - "x": 0, - "y": 0 - }, - "id": 1, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "avg by(instance) (worker_cpu_usage_percent)", - "refId": "A" - } - ], - "title": "CPU Utilization (%)", - "type": "timeseries" + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" }, { "datasource": { "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "fieldConfig": { - "defaults": { - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "red", - "value": null - }, - { - "color": "orange", - "value": 75 - }, - { - "color": "green", - "value": 100 - } - ] - }, - "unit": "bytes" - }, - "overrides": [] - }, - "gridPos": { - "h": 4, - "w": 6, - "x": 6, - "y": 0 - }, - "id": 2, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "avg by(instance) (worker_memory_usage_mb) * 1000000", - "refId": "A" - } - ], - "title": "Memory Utilization (Mb)", - "type": "timeseries" + "enable": true, + "expr": "migration_start_time_ms", + "filter": { + "exclude": true, + "ids": [ + 18 + ] + }, + "hide": false, + "iconColor": "orange", + "name": "Migration start", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "textFormat": "", + "titleFormat": "Migration Start", + "useValueForTime": "on" }, { "datasource": { "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "fieldConfig": { - "defaults": { - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "red", - "value": null - }, - { - "color": "orange", - "value": 75 - }, - { - "color": "green", - "value": 100 - } - ] - }, - "unit": "bytes" - }, - "overrides": [] - }, - "gridPos": { - "h": 4, - "w": 6, - "x": 12, - "y": 0 - }, - "id": 3, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "sum(rate(worker_network_rx_kb[5s])) * 1000", - "refId": "A" - } - ], - "title": "Ingress Network (bytes/s)", - "type": "timeseries" + "enable": true, + "expr": "migration_end_time_ms", + "filter": { + "exclude": true, + "ids": [ + 18 + ] + }, + "hide": false, + "iconColor": "yellow", + "name": "Migration end", + "titleFormat": "Migration end", + "useValueForTime": "on" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 2, + "links": [ + { + "asDropdown": false, + "icon": "external link", + "includeVars": false, + "keepTime": false, + "tags": [], + "targetBlank": false, + "title": "Worker Specific", + "tooltip": "", + "type": "dashboards", + "url": "" + } + ], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "description": "The amount of CPU time consumed by each of the different transactional phases in ms", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" }, - "fieldConfig": { - "defaults": { - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "red", - "value": null - }, - { - "color": "orange", - "value": 75 - }, - { - "color": "green", - "value": 100 - } - ] - }, - "unit": "bytes" - }, - "overrides": [] - }, - "gridPos": { - "h": 4, - "w": 6, - "x": 18, - "y": 0 - }, - "id": 4, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "sum(rate(worker_network_tx_kb[5s])) * 1000", - "refId": "A" - } - ], - "title": "Egress Network (bytes/s)", - "type": "timeseries" + "overrides": [] }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "gridPos": { + "h": 6, + "w": 8, + "x": 0, + "y": 0 + }, + "id": 22, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true }, - "fieldConfig": { - "defaults": { - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "red", - "value": null - }, - { - "color": "orange", - "value": 75 - }, - { - "color": "green", - "value": 100 - } - ] - }, - "unit": "short" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 0, - "y": 4 - }, - "id": 5, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "sum(rate(worker_epoch_throughput_tps[30s]))", - "refId": "A" - } - ], - "title": "TPS (committed)", - "description": "Throughput per epoch of the committed transactions", - "type": "timeseries" + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "rate(phase_cpu_ms_total[5s])", + "hide": false, + "legendFormat": "{{phase}} ({{exported_instance}})", + "range": true, + "refId": "A" + } + ], + "title": "CPU per phase (ms)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" }, - "fieldConfig": { - "defaults": { - "min": 0, - "max": 1000, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "orange", - "value": 250 - }, - { - "color": "red", - "value": 500 - } - ] + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 0 + }, + "id": 23, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "rate(phase_net_rx_bytes_total[5s])", + "legendFormat": "{{phase}} ({{exported_instance}})", + "range": true, + "refId": "A" + } + ], + "title": "Ingress per phase", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - "custom": { - "thresholdsStyle": { - "mode": "area" + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 16, + "y": 0 + }, + "id": 24, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "rate(phase_net_tx_bytes_total[5s])", + "legendFormat": "{{phase}} ({{exported_instance}})", + "range": true, + "refId": "A" + } + ], + "title": "Egress per phase", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "System-wide CPU utilization", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "unit": "ms" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 8, - "y": 4 - }, - "id": 6, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 75 + }, + { + "color": "green", + "value": 100 + } + ] + }, + "unit": "percent" + }, + "overrides": [ { - "expr": "avg(worker_epoch_latency_ms)", - "refId": "A" + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "coordinator:8000" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] } - ], - "title": "Transaction Latency", - "description": "Total latency of commited transactions", - "type": "timeseries" + ] }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 6 + }, + "id": 1, + "interval": "15s", + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false }, - "fieldConfig": { - "defaults": { - "max": 100, - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "orange", - "value": 25 - }, - { - "color": "red", - "value": 50 - } - ] + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "avg by(instance) (worker_cpu_usage_percent > 0)", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "CPU Utilization (%)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - "custom": { - "thresholdsStyle": { - "mode": "area" + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 75 + }, + { + "color": "green", + "value": 100 } - }, - "unit": "percent" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 16, - "y": 4 - }, - "id": 7, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "avg(worker_abort_percent)", - "refId": "A" - } - ], - "title": "Abort Rate (%)", - "description": "Percentage of aborted transactions", - "type": "timeseries" + ] + }, + "unit": "bytes" + }, + "overrides": [] }, - { - "id": 8, - "type": "piechart", - "title": "Latency Breakdown", - "description": "A breakdown of the transaction latency by component.", - "gridPos": { - "h": 10, - "w": 8, - "x": 0, - "y": 10 - }, - "fieldConfig": { - "defaults": { - "custom": { - "hideFrom": { - "tooltip": false, - "viz": false, - "legend": false + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 6 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "avg by(instance) (worker_memory_usage_mb) * 1000000", + "range": true, + "refId": "A" + } + ], + "title": "Memory Utilization (Mb)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 75 + }, + { + "color": "green", + "value": 100 } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 6 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "expr": "sum(rate(worker_network_rx_kb[5s])) * 1000", + "refId": "A" + } + ], + "title": "Ingress Network (bytes/s)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - "color": { - "mode": "palette-classic" + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - "mappings": [] + "thresholdsStyle": { + "mode": "off" + } }, - "overrides": [] + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 75 + }, + { + "color": "green", + "value": 100 + } + ] + }, + "unit": "bytes" }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "refId": "A", - "editorMode": "code", - "expr": "avg(latency_breakdown) by (component)", - "legendFormat": "{{cpu}}", - "range": true + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 6 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "expr": "sum(rate(worker_network_tx_kb[5s])) * 1000", + "refId": "A" + } + ], + "title": "Egress Network (bytes/s)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Rate of how many requests the workers consume from kafka (per second). This metric tries to approximate the input rate that is coming into styx, although it can be inaccurate when there is migration/slow epochs, as it will produce artificial spikes due to the workers consuming a lot more records in order to catch up.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 0, + "y": 10 + }, + "id": 25, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "sum(rate(input_rate_counter_total[15s]))", + "legendFormat": "Total Worker Consumption Rate", + "range": true, + "refId": "A" + } + ], + "title": "Consumption Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Throughput per epoch of the committed transactions", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 75 + }, + { + "color": "green", + "value": 100 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 6, + "y": 10 + }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "sum(rate(epoch_total_transactions_total[15s]))", + "range": true, + "refId": "A" + } + ], + "title": "TPS (committed)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Backlog - number of input elements that have been consumed by styx but not yet processed (lag)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] } - ], - "datasource": { - "uid": "PBFA97CFB590B2093", - "type": "prometheus" - }, - "options": { - "reduceOptions": { - "values": false, - "calcs": [ - "lastNotNull" - ], - "fields": "" - }, - "pieType": "pie", - "tooltip": { - "mode": "single", - "sort": "none", - "hideZeros": false - }, - "legend": { - "showLegend": true, - "displayMode": "list", - "placement": "bottom" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 12, + "y": 10 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "disableTextWrap": false, + "editorMode": "code", + "expr": "sum(queue_backlog)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Backlog (Lag)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Calculates the per-second average rate of the different types of commits and aborts for each epoch", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 18, + "y": 10 + }, + "id": 14, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" } }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "rate(epoch_committed_lock_free_total[15s]) > 0", + "legendFormat": "lock_free_commited ({{exported_instance}})", + "range": true, + "refId": "A" }, - "fieldConfig": { - "defaults": { - "min": 0, - "max": 1000, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "orange", - "value": 250 - }, - { - "color": "red", - "value": 500 - } - ] + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(epoch_committed_fallback_total[15s])> 0", + "hide": false, + "instant": false, + "legendFormat": "fallback_comitted ({{exported_instance}})", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(epoch_logic_aborts_total[15s]) > 0", + "hide": false, + "instant": false, + "legendFormat": "logic_aborts ({{exported_instance}})", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(epoch_concurrency_aborts_total[15s]) > 0", + "hide": false, + "instant": false, + "legendFormat": "concurrency_aborts ({{exported_instance}})", + "range": true, + "refId": "D" + } + ], + "title": "Average Commit & Abort Count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Amount of active workers participating in the protocol", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "custom": { - "thresholdsStyle": { - "mode": "area" + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 } - }, - "unit": "ms" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 16, - "x": 8, - "y": 10 - }, - "id": 9, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "avg(worker_total_snapshotting_time_ms)", - "refId": "A" + ] } - ], - "title": "Snapshot Latency", - "description": "Total latency for taking a snapshot", - "type": "timeseries" + }, + "overrides": [] }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "gridPos": { + "h": 6, + "w": 6, + "x": 0, + "y": 15 + }, + "id": 26, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true }, - "fieldConfig": { - "defaults": { - "min": 0, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "orange", - "value": 1000 - }, - { - "color": "red", - "value": 1500 - } - ] + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "live_worker_count", + "legendFormat": "Worker count", + "range": true, + "refId": "A" + } + ], + "title": "Worker count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Breakdown of the latency of different phases of the transactional protocol", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - "custom": { - "drawStyle": "bars", - "barAlignment": 0, - "thresholdsStyle": { - "mode": "area" + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 } - }, - "unit": "" - }, - "overrides": [] - }, - "gridPos": { - "h": 4, - "w": 16, - "x": 8, - "y": 16 - }, - "id": 11, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "legend": { - "showLegend": false - }, - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "11.5.0-pre", - "targets": [ - { - "expr": "time_since_last_heartbeat", - "refId": "A", - "legendFormat": "{{exported_instance}}" + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 15 + }, + "id": 13, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "avg(latency_breakdown > 0) by (component)", + "legendFormat": "{{component}}", + "range": true, + "refId": "A" + } + ], + "title": "Average Latency Breakdown", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "CPU Ratio - ratio of processing time to total epoch time. Shows what percentage of the time spent on the epoch was spent doing cpu intensive tasks\nIO Ratio - ratio of IO time to total epoch time. Shows what percentage of the time spent on the epoch was spent doing IO intensive tasks", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "transparent", + "value": null + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 12, + "y": 15 + }, + "id": 17, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "avg_over_time(worker_cpu_utilization[5s]) * 100", + "hide": false, + "instant": false, + "legendFormat": "CPU ({{exported_instance}})", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "avg_over_time(worker_io_utilization[5s]) * 100", + "hide": false, + "instant": false, + "legendFormat": "IO ({{exported_instance}})", + "range": true, + "refId": "C" + } + ], + "title": "Worker Utilisation", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Total latency of commited transactions", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 250 + }, + { + "color": "red", + "value": 500 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 15 + }, + "id": 6, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "expr": "avg(worker_epoch_latency_ms)", + "refId": "A" + } + ], + "title": "Transaction Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 21 + }, + "id": 19, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "avg by (operator) (operator_latency_ms)", + "hide": false, + "legendFormat": "{{operator}}", + "range": true, + "refId": "A" + } + ], + "title": "Operator Execution time (ms)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Number of times a call goes through an operator during the epoch per worker", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] } - ], - "title": "Time since last heartbeat (ms)", - "description": "Time since the last heartbeat for each worker", - "type": "bargauge" - } - ], - "preload": false, - "refresh": "1s", - "schemaVersion": 40, - "tags": ["Styx", "General"], - "templating": { - "list": [] + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 21 + }, + "id": 21, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "(rate(operator_call_count_total[15s])) > 0", + "legendFormat": "{{operator}} ({{exported_instance}})", + "range": true, + "refId": "A" + } + ], + "title": "Average Operator Call Count ", + "type": "timeseries" }, - "time": { - "from": "now-15m", - "to": "now" + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Total latency for taking a snapshot", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 250 + }, + { + "color": "red", + "value": 500 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 16, + "y": 21 + }, + "id": 9, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "editorMode": "code", + "expr": "avg(worker_total_snapshotting_time_ms)", + "range": true, + "refId": "A" + } + ], + "title": "Snapshot Latency", + "type": "timeseries" }, - "timepicker": {}, - "timezone": "browser", - "title": "Styx System Overview", - "uid": "beckc0nxpeupsf", - "version": 4, - "weekStart": "" - } \ No newline at end of file + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Percentage of aborted transactions", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 25 + }, + { + "color": "red", + "value": 50 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 20, + "y": 21 + }, + "id": 7, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "expr": "avg(worker_abort_percent)", + "refId": "A" + } + ], + "title": "Abort Rate (%)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Time since the last heartbeat for each worker", + "fieldConfig": { + "defaults": { + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "orange", + "value": 1000 + }, + { + "color": "red", + "value": 1500 + } + ] + }, + "unit": "" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 8, + "x": 16, + "y": 26 + }, + "id": 11, + "options": { + "colorMode": "value", + "displayMode": "gradient", + "graphMode": "area", + "justifyMode": "auto", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "showUnfilled": true, + "sizing": "auto", + "textMode": "auto", + "valueMode": "color", + "wideLayout": true + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "expr": "time_since_last_heartbeat", + "legendFormat": "{{exported_instance}}", + "refId": "A" + } + ], + "title": "Time since last heartbeat (ms)", + "type": "bargauge" + } + ], + "preload": false, + "refresh": "5s", + "schemaVersion": 40, + "tags": [ + "Styx", + "General" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Styx System Overview", + "uid": "beckc0nxpeupsf", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/scripts/run_experiment.sh b/scripts/run_experiment.sh index cd4607b..4807687 100755 --- a/scripts/run_experiment.sh +++ b/scripts/run_experiment.sh @@ -5,6 +5,7 @@ enable_compression=true use_composite_keys=true use_fallback_cache=true regenerate_tpcc_data=false +workload_profile="constant" workload_name=$1 input_rate=$2 @@ -16,12 +17,12 @@ total_time=$7 saving_dir=$8 warmup_seconds=$9 epoch_size=${10} - -[ -n "${11}" ] && styx_threads_per_worker=${11} -[ -n "${12}" ] && enable_compression=${12} -[ -n "${13}" ] && use_composite_keys=${13} -[ -n "${14}" ] && use_fallback_cache=${14} -[ -n "${15}" ] && regenerate_tpcc_data=${15} +[ -n "${11}" ] && workload_profile=${11} +[ -n "${12}" ] && styx_threads_per_worker=${12} +[ -n "${13}" ] && enable_compression=${13} +[ -n "${14}" ] && use_composite_keys=${14} +[ -n "${15}" ] && use_fallback_cache=${15} +[ -n "${16}" ] && regenerate_tpcc_data=${16} kill_at="-1" # This means that we are not going to kill any containers using this script # Deployment mode configuration (read from environment). @@ -47,8 +48,18 @@ echo "use_composite_keys: $use_composite_keys" echo "use_fallback_cache: $use_fallback_cache" echo "regenerate_tpcc_data: $regenerate_tpcc_data" echo "deploy_mode: $DEPLOY_MODE" +echo "workload_profile: $workload_profile" echo "===================================================" +case "$workload_profile" in + constant|increasing|decreasing|random|cosine|step) ;; + *) + echo "ERROR: Unknown workload profile: $workload_profile" + exit 1 + ;; +esac +load_config_path="demo/load_profiles/$workload_profile.yaml" + # Use kubefwd to forward all services in the namespace. # kubefwd adds /etc/hosts entries for service names and pod FQDNs, which is # required for Kafka: kubectl port-forward alone causes advertised-listener @@ -87,13 +98,13 @@ if [[ $workload_name == "ycsbt" ]]; then # To check if the state is correct within Styx, expensive to run together with large scale experiments, use for debug # values true | false run_with_validation=false - python demo/demo-ycsb/client.py "$client_threads" "$n_keys" "$n_part" "$zipf_const" "$input_rate" "$total_time" "$saving_dir" "$warmup_seconds" "$run_with_validation" "$kill_at" + python demo/demo-ycsb/client.py "$client_threads" "$n_keys" "$n_part" "$zipf_const" "$input_rate" "$total_time" "$saving_dir" "$warmup_seconds" "$run_with_validation" "$load_config_path" "$kill_at" elif [[ $workload_name == "dhr" ]]; then # Deathstar Hotel Reservation - python demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py "$saving_dir" "$client_threads" "$n_part" "$input_rate" "$total_time" "$warmup_seconds" "$kill_at" + python demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py "$saving_dir" "$client_threads" "$n_part" "$input_rate" "$total_time" "$warmup_seconds" "$load_config_path" "$kill_at" elif [[ $workload_name == "dmr" ]]; then # Deathstar Movie Review - python demo/demo-deathstar-movie-review/pure_kafka_demo.py "$saving_dir" "$client_threads" "$n_part" "$input_rate" "$total_time" "$warmup_seconds" "$kill_at" + python demo/demo-deathstar-movie-review/pure_kafka_demo.py "$saving_dir" "$client_threads" "$n_part" "$input_rate" "$total_time" "$warmup_seconds" "$load_config_path" "$kill_at" elif [[ $workload_name == "tpcc" ]]; then # TPC-C DATA_DIR="demo/demo-tpc-c/data_${n_keys}" @@ -123,7 +134,7 @@ elif [[ $workload_name == "tpcc" ]]; then python demo/demo-tpc-c/pure_kafka_demo.py \ "$saving_dir" "$client_threads" "$n_part" \ "$input_rate" "$total_time" "$warmup_seconds" \ - "$n_keys" "$enable_compression" "$use_composite_keys" "$use_fallback_cache" "$kill_at" + "$n_keys" "$enable_compression" "$use_composite_keys" "$use_fallback_cache" "$load_config_path" "$kill_at" else echo "Benchmark not supported!" fi @@ -137,5 +148,6 @@ if [[ "$DEPLOY_MODE" == "k8s-minikube" || "$DEPLOY_MODE" == "k8s-cluster" ]]; th fi bash scripts/uninstall_styx_cluster_with_helm.sh else - bash scripts/stop_styx_cluster.sh "$styx_threads_per_worker" + #bash scripts/stop_styx_cluster.sh "$styx_threads_per_worker" + docker compose stop coordinator worker fi diff --git a/scripts/run_migration_experiment.sh b/scripts/run_migration_experiment.sh index 22406d9..90d83af 100755 --- a/scripts/run_migration_experiment.sh +++ b/scripts/run_migration_experiment.sh @@ -10,6 +10,8 @@ enable_compression=true use_composite_keys=true use_fallback_cache=true regenerate_tpcc_data=false +workload_profile="constant" + # Read positional arguments input_rate=$1 @@ -22,13 +24,14 @@ warmup_seconds=$7 epoch_size=$8 workload_name=$9 n_keys=${10} -[ -n "${11:-}" ] && regenerate_tpcc_data=${11} +[ -n "${11:-}" ] && workload_profile=${11} +[ -n "${12:-}" ] && regenerate_tpcc_data=${12} # Optional overrides (minimal, but allows parity with start_experiment.sh style) -[ -n "${12:-}" ] && styx_threads_per_worker=${12} -[ -n "${13:-}" ] && enable_compression=${13} -[ -n "${14:-}" ] && use_composite_keys=${14} -[ -n "${15:-}" ] && use_fallback_cache=${15} +[ -n "${13:-}" ] && styx_threads_per_worker=${13} +[ -n "${14:-}" ] && enable_compression=${14} +[ -n "${15:-}" ] && use_composite_keys=${15} +[ -n "${16:-}" ] && use_fallback_cache=${16} # Determine the maximum number of partitions if (( start_n_part > end_n_part )); then @@ -54,8 +57,18 @@ echo "enable_compression: $enable_compression" echo "use_composite_keys: $use_composite_keys" echo "use_fallback_cache: $use_fallback_cache" echo "regenerate_tpcc_data: $regenerate_tpcc_data" +echo "workload_profile: $workload_profile" echo "============================================================" +case "$workload_profile" in + constant|increasing|decreasing|random|cosine|step) ;; + *) + echo "ERROR: Unknown workload profile: $workload_profile" + exit 1 + ;; +esac +load_config_path="demo/load_profiles/$workload_profile.yaml" + bash "$ROOT_DIR/scripts/start_styx_cluster.sh" \ "$start_n_part" "$epoch_size" "$styx_threads_per_worker" \ "$enable_compression" "$use_composite_keys" "$use_fallback_cache" @@ -67,7 +80,7 @@ if [[ "$workload_name" == "ycsb" ]]; then python "$ROOT_DIR/demo/demo-migration-ycsb/client.py" \ "$client_threads" "$start_n_part" "$end_n_part" \ - "$input_rate" "$total_time" "$saving_dir" "$warmup_seconds" "$n_keys" + "$input_rate" "$total_time" "$saving_dir" "$warmup_seconds" "$n_keys" "$load_config_path" elif [[ "$workload_name" == "tpcc" ]]; then @@ -98,7 +111,7 @@ elif [[ "$workload_name" == "tpcc" ]]; then python "$ROOT_DIR/demo/demo-migration-tpc-c/pure_kafka_demo.py" \ "$saving_dir" "$client_threads" "$start_n_part" "$end_n_part" \ "$input_rate" "$total_time" "$warmup_seconds" "$n_keys" \ - "$enable_compression" "$use_composite_keys" "$use_fallback_cache" + "$enable_compression" "$use_composite_keys" "$use_fallback_cache" "$load_config_path" else echo "Benchmark not supported: $workload_name" diff --git a/styx-package/styx/common/metrics.py b/styx-package/styx/common/metrics.py new file mode 100644 index 0000000..3f7dcf6 --- /dev/null +++ b/styx-package/styx/common/metrics.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True) +class WorkerEpochStats: + worker_id: int + epoch_throughput: float + epoch_latency: float + local_abort_rate: float + wal_time: float + func_time: float + chain_ack_time: float + sync_time: float + conflict_res_time: float + commit_time: float + fallback_time: float + snap_time: float + input_rate: float + queue_backlog: float + total_txns: int + committed_txns: int + logic_aborts: int + concurrency_aborts: int + committed_lock_free: int + committed_fallback: int + cpu_utilization: float + io_wait_utilization: float + operator_epoch_stats: list[tuple[str, int, float, float, int]] + phase_resources: dict[str, dict[str, int]] diff --git a/styx-package/styx/common/operator.py b/styx-package/styx/common/operator.py index 724ea6a..2db294b 100644 --- a/styx-package/styx/common/operator.py +++ b/styx-package/styx/common/operator.py @@ -1,5 +1,6 @@ import asyncio import os +import time from typing import TYPE_CHECKING from setuptools._distutils.util import strtobool @@ -132,6 +133,7 @@ async def run_function( ) params = (f, *tuple(params)) success: bool = True + start_time = time.time() if ack_payload is not None: # part of a chain (not root) ( @@ -178,6 +180,10 @@ async def run_function( elif resp is not None: resp: str | Exception self.__networking.add_response(t_id, resp) + end_time = time.time() + duration_ms = (end_time - start_time) * 1000 + if hasattr(protocol, "record_operator_call"): + protocol.record_operator_call(self.name, partition, function_name, duration_ms, success) return success async def _send_response_to_root( diff --git a/tests/e2e/test_e2e_deathstar_hotel_reservation.py b/tests/e2e/test_e2e_deathstar_hotel_reservation.py index 24b4904..ed0ac87 100644 --- a/tests/e2e/test_e2e_deathstar_hotel_reservation.py +++ b/tests/e2e/test_e2e_deathstar_hotel_reservation.py @@ -112,6 +112,7 @@ def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParam str(client.input_rate), str(client.total_time), str(client.warmup_seconds), + "../load_profiles/constant.yaml", str(client.kill_at), ] diff --git a/tests/e2e/test_e2e_deathstar_movie_review.py b/tests/e2e/test_e2e_deathstar_movie_review.py index e45899c..eb0215e 100644 --- a/tests/e2e/test_e2e_deathstar_movie_review.py +++ b/tests/e2e/test_e2e_deathstar_movie_review.py @@ -112,6 +112,7 @@ def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParam str(client.input_rate), str(client.total_time), str(client.warmup_seconds), + "../load_profiles/constant.yaml", str(client.kill_at), ] diff --git a/tests/e2e/test_e2e_migration_tpcc.py b/tests/e2e/test_e2e_migration_tpcc.py index 1d51fcb..ff0a0ed 100644 --- a/tests/e2e/test_e2e_migration_tpcc.py +++ b/tests/e2e/test_e2e_migration_tpcc.py @@ -133,6 +133,7 @@ def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParam cluster.enable_compression, cluster.use_composite_keys, cluster.use_fallback_cache, + "../load_profiles/constant.yaml", ] diff --git a/tests/e2e/test_e2e_migration_ycsb.py b/tests/e2e/test_e2e_migration_ycsb.py index d797293..2bdbdae 100644 --- a/tests/e2e/test_e2e_migration_ycsb.py +++ b/tests/e2e/test_e2e_migration_ycsb.py @@ -124,6 +124,7 @@ def _client_cmd(results_dir: Path, client: _ClientParams) -> list[str]: str(results_dir), str(client.warmup_seconds), str(client.n_entities), + "../load_profiles/constant.yaml", ] diff --git a/tests/e2e/test_e2e_tpcc.py b/tests/e2e/test_e2e_tpcc.py index ef20099..c93a7e8 100644 --- a/tests/e2e/test_e2e_tpcc.py +++ b/tests/e2e/test_e2e_tpcc.py @@ -117,7 +117,8 @@ def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParam # 8 enable_compression # 9 use_composite_keys # 10 use_fallback_cache - # 11 kill_at (optional; default -1) + # 11 load_config_path + # 12 kill_at (optional; default -1) return [ "python", "pure_kafka_demo.py", @@ -131,6 +132,7 @@ def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParam cluster.enable_compression, cluster.use_composite_keys, cluster.use_fallback_cache, + "../load_profiles/constant.yaml", str(client.kill_at), ] diff --git a/tests/e2e/test_e2e_ycsb.py b/tests/e2e/test_e2e_ycsb.py index 06d3da4..4dda1f9 100644 --- a/tests/e2e/test_e2e_ycsb.py +++ b/tests/e2e/test_e2e_ycsb.py @@ -112,7 +112,7 @@ def _stop_cmd(paths: _Paths, p: _ClusterParams) -> list[str]: def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParams) -> list[str]: - # client.py expects argv[10] = kill_at + # client.py expects argv[10] = load_config_path, argv[11] = kill_at return [ "python", "client.py", @@ -125,6 +125,7 @@ def _client_cmd(results_dir: Path, cluster: _ClusterParams, client: _ClientParam str(results_dir), str(client.warmup_seconds), client.run_with_validation, + "../load_profiles/constant.yaml", str(client.kill_at), ] diff --git a/worker/ingress/styx_kafka_ingress.py b/worker/ingress/styx_kafka_ingress.py index d7f753c..96bc9fe 100644 --- a/worker/ingress/styx_kafka_ingress.py +++ b/worker/ingress/styx_kafka_ingress.py @@ -48,6 +48,13 @@ def __init__( self.kafka_consumer: AIOKafkaConsumer | None = None self.kafka_ingress_task: asyncio.Task | None = None + self._ingress_epoch: int | None = None + self.epoch_stats: dict[str, int] = { + "consumed": 0, # Kafka ClientMsg records consumed + "sequenced": 0, # Calls enqueued locally + "forwarded_wrong_partition": 0, # Forwarded via TCP as WrongPartitionRequest + } + async def start( self, topic_partitions: list[TopicPartition], @@ -67,8 +74,18 @@ async def stop(self) -> None: await self.kafka_consumer.stop() def handle_message_from_kafka(self, msg: ConsumerRecord) -> None: + current_epoch = self.sequencer.epoch_counter + if current_epoch != self._ingress_epoch: + self.epoch_stats = { + "consumed": 0, + "sequenced": 0, + "forwarded_wrong_partition": 0, + } + self._ingress_epoch = current_epoch + message_type: int = self.networking.get_msg_type(msg.value) if message_type == MessageType.ClientMsg: + self.epoch_stats["consumed"] += 1 message = self.networking.decode_message(msg.value) operator_name, key, fun_name, params, partition = message run_func_payload: RunFuncPayload = RunFuncPayload( @@ -84,11 +101,13 @@ def handle_message_from_kafka(self, msg: ConsumerRecord) -> None: if key is None or self.state.exists(key, operator_name, partition): # Message received in the correct partition (Normal operation) self.sequencer.sequence(run_func_payload) + self.epoch_stats["sequenced"] += 1 elif ( true_partition := self.registered_operators[(operator_name, msg.partition)].which_partition(key) ) == partition: # Message received in the correct partition, but it was an insert operation (didn't exist in the state) self.sequencer.sequence(run_func_payload) + self.epoch_stats["sequenced"] += 1 else: # In flight message during migration, currently the state belongs to another partition dns = self.registered_operators[(operator_name, msg.partition)].dns @@ -106,6 +125,7 @@ def handle_message_from_kafka(self, msg: ConsumerRecord) -> None: kafka_ingress_partition=msg.partition, ) self.sequencer.sequence(run_func_payload) + self.epoch_stats["sequenced"] += 1 else: payload = ( msg.key, @@ -126,6 +146,7 @@ def handle_message_from_kafka(self, msg: ConsumerRecord) -> None: serializer=Serializer.MSGPACK, ), ) + self.epoch_stats["forwarded_wrong_partition"] += 1 else: logging.error(f"Invalid message type: {message_type} passed to KAFKA") diff --git a/worker/transactional_protocols/aria.py b/worker/transactional_protocols/aria.py index 414497f..4eaa735 100644 --- a/worker/transactional_protocols/aria.py +++ b/worker/transactional_protocols/aria.py @@ -1,8 +1,10 @@ import asyncio import contextlib +from dataclasses import astuple import os import time from timeit import default_timer as timer +from traceback import format_exc from typing import TYPE_CHECKING from msgspec import msgpack @@ -21,17 +23,21 @@ AriaConflictDetectionType, ) from worker.sequencer.sequencer import Sequencer +from worker.util.epoch_metrics_builder import EpochMetricsBuilder, EpochMetricsInput +from worker.util.phase_resource_tracker import PhaseResourceTracker if TYPE_CHECKING: from collections.abc import Awaitable, Callable from aiokafka import TopicPartition + from styx.common.metrics import WorkerEpochStats from styx.common.operator import Operator from styx.common.types import OperatorPartition from worker.operator_state.aria.in_memory_state import InMemoryOperatorState from worker.operator_state.stateless import Stateless + DISCOVERY_HOST: str = os.environ["DISCOVERY_HOST"] DISCOVERY_PORT: int = int(os.environ["DISCOVERY_PORT"]) @@ -194,6 +200,25 @@ def __init__( MessageType.AsyncMigration: self._handle_async_migration, } + self.cpu_work_ms: float = 0.0 # Time spent in actual function execution + + self.operator_metrics = {} + # Per-phase resource attribution (CPU/RSS/RX/TX deltas), aggregated per epoch. + self.phase_resource_tracker = PhaseResourceTracker() + + def record_operator_call( + self, operator_name: str, partition: int, function_name: str, duration_ms: float, success: bool + ) -> None: + """ + Record an operator call for metrics tracking. + """ + key = (operator_name, partition, function_name) + m = self.operator_metrics.setdefault(key, {"count": 0, "failures": 0, "sum_ms": 0.0}) + m["count"] += 1 + m["sum_ms"] += duration_ms + if not success: + m["failures"] += 1 + async def wait_stopped(self) -> None: await self.stopped.wait() @@ -218,11 +243,23 @@ async def stop(self) -> None: self.stopped.set() logging.warning(f"Aria protocol stopped at: {self.topic_partition_offsets}") + # Add clear error logging so that transactional protocol does not fail silently + def _task_exception_handler(self, task: asyncio.Task) -> None: + try: + task.result() + except asyncio.CancelledError: + pass + except Exception as e: + logging.error(f"Task {task.get_name()} crashed: {e}\n{format_exc()}") + def start(self) -> None: self.function_scheduler_task = asyncio.create_task(self.function_scheduler()) + self.function_scheduler_task.add_done_callback(self._task_exception_handler) self.communication_task = asyncio.create_task(self.communication_protocol()) + self.communication_task.add_done_callback(self._task_exception_handler) if self.migrating_state and USE_ASYNC_MIGRATION: self.migration_sender_task = asyncio.create_task(self._continuous_migration_sender()) + self.migration_sender_task.add_done_callback(self._task_exception_handler) logging.warning( f"Aria protocol started with operator partitions: {list(self.registered_operators.keys())}", ) @@ -573,60 +610,77 @@ async def function_scheduler(self) -> None: async def _process_epoch(self, sequence: list[SequencedItem]) -> None: epoch_start = timer() + self.currently_processing = True + self.phase_resource_tracker.reset_epoch() timings = await self._run_epoch_functions_and_chain(sequence) sync_time = 0.0 + logic_aborts_count = len(self.networking.logic_aborts_everywhere) sync_time += await self._sync_processing_done() conflict_resolution_start = timer() - # HERE WE KNOW ALL THE LOGIC ABORTS + self.phase_resource_tracker.begin("Conflict Resolution") self.local_state.remove_aborted_from_rw_sets( self.networking.logic_aborts_everywhere, ) concurrency_aborts = await self._compute_concurrency_aborts() + self.phase_resource_tracker.end("Conflict Resolution") conflict_resolution_end = timer() - local_abort_rate = (len(concurrency_aborts) / len(sequence)) if sequence else 0.0 - - # Notify peers that we are ready to commit sync_time += await self._sync_commit(sequence, concurrency_aborts) - # HERE WE KNOW ALL THE CONCURRENCY ABORTS commit_start = timer() + self.phase_resource_tracker.begin("Commit time") self._commit_and_prepare_responses(sequence) await self.send_delta_to_snapshotting_proc() + self.phase_resource_tracker.end("Commit time") commit_end = timer() fallback_start = timer() - abort_rate = await self._maybe_run_fallback() + self.phase_resource_tracker.begin("Fallback") + _, committed_fallback = await self._maybe_run_fallback() + self.phase_resource_tracker.end("Fallback") fallback_end = timer() self._advance_offsets(sequence) - self.sequencer.increment_epoch(self.max_t_counter, self.t_ids_to_reschedule) await self.wait_responses_to_be_sent.wait() self.cleanup_after_epoch() snap_start = timer() + self.phase_resource_tracker.begin("Async Snapshot") await self.take_snapshot() + self.phase_resource_tracker.end("Async Snapshot") snap_end = timer() epoch_end = timer() - await self._sync_cleanup( + # Calculate metrics using builder + metrics_input = EpochMetricsInput( + worker_id=self.id, sequence=sequence, - epoch_start=epoch_start, - epoch_end=epoch_end, - local_abort_rate=local_abort_rate, - abort_rate=abort_rate, + committed_lock_free=len(sequence) + - len({seq_i.t_id for seq_i in sequence if seq_i.t_id in self.concurrency_aborts_everywhere}), + committed_fallback=committed_fallback, + logic_aborts_count=logic_aborts_count, + concurrency_aborts_everywhere=self.concurrency_aborts_everywhere, timings=timings, - sync_time=sync_time, - conflict_ms=(conflict_resolution_end - conflict_resolution_start) * 1000, - commit_ms=(commit_end - commit_start) * 1000, - fallback_ms=(fallback_end - fallback_start) * 1000, - snap_ms=(snap_end - snap_start) * 1000, + phase_resources=self.phase_resource_tracker.export(), + fallback_time_ms=(fallback_end - fallback_start) * 1000, + conflict_resolution_time_ms=(conflict_resolution_end - conflict_resolution_start) * 1000, + commit_time_ms=(commit_end - commit_start) * 1000, + snap_time_ms=(snap_end - snap_start) * 1000, + sync_time_ms=sync_time, + epoch_latency_ms=max(round((epoch_end - epoch_start) * 1000, 4), 1), + input_rate=self.ingress.epoch_stats["consumed"], + queue_backlog=len(self.sequencer.distributed_log), + operator_metrics=self.operator_metrics, ) + worker_epoch_stats = EpochMetricsBuilder(metrics_input).build() + + await self._sync_cleanup(worker_epoch_stats) + self.operator_metrics.clear() async def _run_epoch_functions_and_chain( self, @@ -635,20 +689,26 @@ async def _run_epoch_functions_and_chain( if not sequence: return {"wal_ms": 0.0, "func_ms": 0.0, "chain_ms": 0.0} + self.phase_resource_tracker.begin("WAL") start_wal, end_wal = await self._write_to_wal(sequence) + self.phase_resource_tracker.end("WAL") + self.phase_resource_tracker.begin("1st Run") start_func = timer() await asyncio.gather( *[self.run_function(item.t_id, item.payload) for item in sequence], ) end_func = timer() + self.phase_resource_tracker.end("1st Run") # Wait for chains to finish + self.phase_resource_tracker.begin("Chain Acks") start_chain = timer() await asyncio.gather( *[ack.wait() for ack in self.networking.waited_ack_events.values()], ) end_chain = timer() + self.phase_resource_tracker.end("Chain Acks") return { "wal_ms": (end_wal - start_wal) * 1000, @@ -657,6 +717,7 @@ async def _run_epoch_functions_and_chain( } async def _sync_processing_done(self) -> float: + self.phase_resource_tracker.begin("SYNC") start = timer() await self.sync_workers( msg_type=MessageType.AriaProcessingDone, @@ -664,6 +725,7 @@ async def _sync_processing_done(self) -> float: serializer=Serializer.PICKLE, ) end = timer() + self.phase_resource_tracker.end("SYNC") logging.debug( f"{self.id} ||| logic_aborts_everywhere: {self.networking.logic_aborts_everywhere}", @@ -713,6 +775,7 @@ async def _sync_commit( concurrency_aborts: set[int], ) -> float: start = timer() + self.phase_resource_tracker.begin("SYNC") await self.sync_workers( msg_type=MessageType.AriaCommit, message=( @@ -723,6 +786,7 @@ async def _sync_commit( ), serializer=Serializer.PICKLE, ) + self.phase_resource_tracker.end("SYNC") end = timer() return end - start @@ -753,17 +817,21 @@ def _commit_and_prepare_responses(self, sequence: list[SequencedItem]) -> None: f"{len(self.concurrency_aborts_everywhere)} / {self.total_processed_seq_size}", ) - async def _maybe_run_fallback(self) -> float: + async def _maybe_run_fallback(self) -> tuple[float, int]: abort_rate: float = ( len(self.concurrency_aborts_everywhere) / self.total_processed_seq_size if self.total_processed_seq_size else 0.0 ) + committed_fallback = 0 if abort_rate > FALLBACK_STRATEGY_PERCENTAGE: logging.debug( f"{self.id} ||| Epoch: {self.sequencer.epoch_counter} " f"Abort percentage: {int(abort_rate * 100)}% initiating fallback strategy...", ) + # Transactions to commit in fallback = concurrency aborts minus logic aborts + local_aborted_t_ids = self.sequencer.get_aborted_sequence(self.t_ids_to_reschedule) + committed_fallback = len(local_aborted_t_ids) await self.run_fallback_strategy() await self.send_delta_to_snapshotting_proc() @@ -771,7 +839,7 @@ async def _maybe_run_fallback(self) -> float: # Keep rescheduled t_ids (rw-set changed during fallback) for the next epoch self.t_ids_to_reschedule = self.fallback_rescheduled_t_ids.copy() self.fallback_rescheduled_t_ids.clear() - return abort_rate + return abort_rate, committed_fallback def _advance_offsets(self, sequence: list[SequencedItem]) -> None: for item in sequence: @@ -789,46 +857,11 @@ def _advance_offsets(self, sequence: list[SequencedItem]) -> None: async def _sync_cleanup( self, - *, - sequence: list[SequencedItem], - epoch_start: float, - epoch_end: float, - local_abort_rate: float, - abort_rate: float, - timings: dict[str, float], - sync_time: float, - conflict_ms: float, - commit_ms: float, - fallback_ms: float, - snap_ms: float, + worker_epoch_stats: WorkerEpochStats, ) -> None: - epoch_latency = max(round((epoch_end - epoch_start) * 1000, 4), 1) - epoch_throughput = ((len(sequence) - len(self.concurrency_aborts_everywhere)) * 1000) // epoch_latency # TPS - - logging.debug( - f"{self.id} ||| Epoch: {self.sequencer.epoch_counter - 1} done in " - f"{epoch_latency}ms " - f"global logic aborts: {len(self.networking.logic_aborts_everywhere)} " - f"concurrency aborts for next epoch: {len(self.concurrency_aborts_everywhere)} " - f"abort rate: {abort_rate}", - ) - await self.sync_workers( msg_type=MessageType.SyncCleanup, - message=( - self.id, - epoch_throughput, - epoch_latency, - local_abort_rate, - round(timings["wal_ms"], 4), - round(timings["func_ms"], 4), - round(timings["chain_ms"], 4), - round(sync_time * 1000, 4), - round(conflict_ms, 4), - round(commit_ms, 4), - round(fallback_ms, 4), - round(snap_ms, 4), - ), + message=astuple(worker_epoch_stats), serializer=Serializer.MSGPACK, ) diff --git a/worker/util/epoch_metrics_builder.py b/worker/util/epoch_metrics_builder.py new file mode 100644 index 0000000..b947975 --- /dev/null +++ b/worker/util/epoch_metrics_builder.py @@ -0,0 +1,119 @@ +from dataclasses import dataclass + +from styx.common.metrics import WorkerEpochStats + + +@dataclass +class EpochMetricsInput: + worker_id: int + sequence: list + committed_lock_free: int + committed_fallback: int + logic_aborts_count: int + concurrency_aborts_everywhere: set[int] + timings: dict[str, float] # func_ms, chain_ms, wal_ms + phase_resources: dict + + fallback_time_ms: float + conflict_resolution_time_ms: float + commit_time_ms: float + snap_time_ms: float + sync_time_ms: float + epoch_latency_ms: float + + input_rate: int + queue_backlog: int + operator_metrics: dict + + +class EpochMetricsBuilder: + """Builds WorkerEpochStats from epoch processing data.""" + + def __init__(self, inputs: EpochMetricsInput) -> None: + self.inputs = inputs + self._precalculate() + + def _precalculate(self) -> None: + """Precalculate commonly used values.""" + self.total_txns = len(self.inputs.sequence) + self.committed_txns = self.inputs.committed_lock_free + self.inputs.committed_fallback + self.concurrency_aborts_count = len( + {seq_i.t_id for seq_i in self.inputs.sequence if seq_i.t_id in self.inputs.concurrency_aborts_everywhere} + ) + + def _build_operator_stats(self) -> list[tuple[str, int, float, float, int]]: + """Aggregate and build per-operator epoch statistics.""" + operator_agg: dict[tuple[str, int], dict[str, float | int]] = {} + + for (op_name, partition, _func_name), m in self.inputs.operator_metrics.items(): + key = (op_name, partition) + agg = operator_agg.setdefault(key, {"calls": 0, "sum_ms": 0.0}) + agg["calls"] += m["count"] + agg["sum_ms"] += m["sum_ms"] + + operator_epoch_stats: list[tuple[str, int, float, float, int]] = [] + epoch_seconds = max(self.inputs.epoch_latency_ms / 1000.0, 1e-6) + + for (op_name, partition), agg in operator_agg.items(): + call_count = int(agg["calls"]) + if call_count == 0: + continue + total_latency_ms = float(agg["sum_ms"]) + avg_latency_ms = total_latency_ms / call_count + tps = call_count / epoch_seconds + operator_epoch_stats.append((op_name, partition, tps, avg_latency_ms, call_count)) + + return operator_epoch_stats + + def _calculate_utilization(self) -> tuple[float, float]: + """Calculate CPU and IO wait utilization.""" + func_time = self.inputs.timings["func_ms"] + conflict_resolution_time = self.inputs.conflict_resolution_time_ms + commit_time = self.inputs.commit_time_ms + fallback_time = self.inputs.fallback_time_ms + + cpu_work_ms = func_time + conflict_resolution_time + commit_time + fallback_time + + chain_time = self.inputs.timings["chain_ms"] + wal_time = self.inputs.timings["wal_ms"] + snap_time = self.inputs.snap_time_ms + io_wait_time_ms = chain_time + wal_time + snap_time + self.inputs.sync_time_ms + + cpu_utilization = (cpu_work_ms / self.inputs.epoch_latency_ms) if self.inputs.epoch_latency_ms > 0 else 0.0 + io_wait_utilization = ( + (io_wait_time_ms / self.inputs.epoch_latency_ms) if self.inputs.epoch_latency_ms > 0 else 0.0 + ) + + return cpu_utilization, io_wait_utilization + + def build(self) -> WorkerEpochStats: + operator_epoch_stats = self._build_operator_stats() + cpu_util, io_util = self._calculate_utilization() + local_abort_rate = (self.concurrency_aborts_count / self.total_txns) if self.total_txns else 0.0 + + return WorkerEpochStats( + worker_id=self.inputs.worker_id, + epoch_throughput=(self.committed_txns * 1000) // self.inputs.epoch_latency_ms, + epoch_latency=self.inputs.epoch_latency_ms, + local_abort_rate=local_abort_rate, + wal_time=round(self.inputs.timings["wal_ms"], 4), + func_time=round(self.inputs.timings["func_ms"], 4), + chain_ack_time=round(self.inputs.timings["chain_ms"], 4), + sync_time=self.inputs.sync_time_ms, + conflict_res_time=self.inputs.conflict_resolution_time_ms, + commit_time=self.inputs.commit_time_ms, + fallback_time=self.inputs.fallback_time_ms, + snap_time=self.inputs.snap_time_ms, + input_rate=self.inputs.input_rate, + queue_backlog=self.inputs.queue_backlog, + total_txns=self.total_txns, + committed_txns=self.committed_txns, + logic_aborts=self.inputs.logic_aborts_count, + concurrency_aborts=self.concurrency_aborts_count, + committed_lock_free=self.inputs.committed_lock_free, + committed_fallback=self.inputs.committed_fallback, + cpu_utilization=cpu_util, + io_wait_utilization=io_util, + operator_epoch_stats=operator_epoch_stats, + phase_resources=self.inputs.phase_resources, + ) diff --git a/worker/util/phase_resource_tracker.py b/worker/util/phase_resource_tracker.py new file mode 100644 index 0000000..9a29981 --- /dev/null +++ b/worker/util/phase_resource_tracker.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass +import os +from pathlib import Path +import time + +import psutil + + +@dataclass(frozen=True) +class ResourceSnapshot: + cpu_ns: int + rss_bytes: int + rx_bytes: int + tx_bytes: int + + +def _read_net_bytes() -> tuple[int, int]: + """ + Reads cumulative RX/TX bytes from the current network namespace. + + Note: this is interface-level accounting (namespace/container-level), not truly per-process. + It's still useful for attributing traffic deltas to phases within a worker process. + """ + try: + with Path.open("/proc/self/net/dev", encoding="utf-8") as f: + lines = f.readlines()[2:] # skip headers + except FileNotFoundError: + return 0, 0 + + rx, tx = 0, 0 + min_parts = 10 + for line in lines: + parts = line.split() + # Format: iface: rx_bytes rx_packets ... tx_bytes tx_packets ... + if len(parts) < min_parts: + continue + rx += int(parts[1]) + tx += int(parts[9]) + return rx, tx + + +class PhaseResourceTracker: + """ + Tracks per-phase resource deltas within a single process. + + - CPU: process CPU time (ns) via time.process_time_ns() + - Memory: RSS max observed during the phase (bytes) + - Network: cumulative RX/TX bytes for the current net namespace (delta attributed to phase) + """ + + def __init__(self, pid: int | None = None) -> None: + self._pid = pid if pid is not None else os.getpid() + self._proc = psutil.Process(self._pid) + self._starts: dict[str, ResourceSnapshot] = {} + self._cpu_ns_total: dict[str, int] = defaultdict(int) + self._rx_bytes_total: dict[str, int] = defaultdict(int) + self._tx_bytes_total: dict[str, int] = defaultdict(int) + self._rss_max_bytes: dict[str, int] = defaultdict(int) + + def reset_epoch(self) -> None: + self._starts.clear() + self._cpu_ns_total.clear() + self._rx_bytes_total.clear() + self._tx_bytes_total.clear() + self._rss_max_bytes.clear() + + def _snapshot(self) -> ResourceSnapshot: + cpu_ns = time.process_time_ns() + rss_bytes = int(self._proc.memory_info().rss) + rx_bytes, tx_bytes = _read_net_bytes() + return ResourceSnapshot(cpu_ns=cpu_ns, rss_bytes=rss_bytes, rx_bytes=rx_bytes, tx_bytes=tx_bytes) + + def begin(self, phase: str) -> None: + snap = self._snapshot() + self._starts[phase] = snap + self._rss_max_bytes[phase] = max(self._rss_max_bytes.get(phase, 0), snap.rss_bytes) + + def end(self, phase: str) -> None: + start = self._starts.pop(phase, None) + if start is None: + return + end = self._snapshot() + self._cpu_ns_total[phase] += max(0, end.cpu_ns - start.cpu_ns) + self._rx_bytes_total[phase] += max(0, end.rx_bytes - start.rx_bytes) + self._tx_bytes_total[phase] += max(0, end.tx_bytes - start.tx_bytes) + self._rss_max_bytes[phase] = max(self._rss_max_bytes.get(phase, 0), end.rss_bytes) + + def export(self) -> dict[str, dict[str, int]]: + return { + "cpu_ns": dict(self._cpu_ns_total), + "rx_bytes": dict(self._rx_bytes_total), + "tx_bytes": dict(self._tx_bytes_total), + "rss_max_bytes": dict(self._rss_max_bytes), + }