Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ grafana-enterprise-11.5.1.darwin-amd64.tar.gz
!grafana/dashboards/styx.json
!grafana/dashboards/server_specific.json
!grafana/*
ycsb_dataset_*
!charts/grafana/files/grafana/dashboards/server_specific.json
!charts/grafana/files/grafana/dashboards/styx.json
!charts/grafana/*
Expand Down
196 changes: 168 additions & 28 deletions coordinator/coordinator_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import boto3
import botocore
from coordinator_metadata import Coordinator
from prometheus_client import Gauge, start_http_server
from prometheus_client import Counter, Gauge, start_http_server
from styx.common.logging import logging
from styx.common.message_types import MessageType
from styx.common.metrics import WorkerEpochStats
from styx.common.protocols import Protocols
from styx.common.serialization import Serializer
from styx.common.tcp_networking import MessagingMode, NetworkingManager
Expand Down Expand Up @@ -119,6 +120,7 @@ def __init__(self) -> None:
self.recovery_state: RecoveryState = RecoveryState.IDLE

self.metrics_server = start_http_server(8000)
self.live_worker_count_gauge = Gauge("live_worker_count", "Number of live workers registered with coordinator")
self.cpu_usage_gauge = Gauge(
"worker_cpu_usage_percent",
"CPU usage percentage",
Expand Down Expand Up @@ -170,6 +172,77 @@ def __init__(self) -> None:
["instance"],
)

self.queue_backlog_gauge = Gauge("queue_backlog", "Backlog in the worker queue", ["instance"])

self.input_rate_counter = Counter("input_rate_counter", "Input rate", ["instance"])
# Transaction count metrics
self.epoch_total_txns_counter = Counter(
"epoch_total_transactions", "Total transactions processed (cumulative)", ["instance"]
)
self.epoch_committed_txns_counter = Counter(
"epoch_committed_transactions", "Committed transactions (cumulative)", ["instance"]
)
self.epoch_logic_aborts_counter = Counter(
"epoch_logic_aborts", "Logic/global aborts (cumulative)", ["instance"]
)
self.epoch_concurrency_aborts_counter = Counter(
"epoch_concurrency_aborts", "Concurrency aborts (cumulative)", ["instance"]
)
self.epoch_committed_lock_free_counter = Counter(
"epoch_committed_lock_free", "Transactions committed in lock-free phase (cumulative)", ["instance"]
)
self.epoch_committed_fallback_counter = Counter(
"epoch_committed_fallback", "Transactions committed in fallback phase (cumulative)", ["instance"]
)
self.cpu_utilization_ratio_gauge = Gauge(
"worker_cpu_utilization", "Ratio of CPU work in the epoch", ["instance"]
)
self.io_utilization_ratio_gauge = Gauge(
"worker_io_utilization", "Ratio of IO wait time in the epoch", ["instance"]
)
# Operator-level performance metrics
self.operator_tps_counter = Counter(
"operator_tps",
"Transactions per second per operator partition (cumulative)",
["instance", "operator", "partition"],
)
self.operator_call_count_counter = Counter(
"operator_call_count",
"Number of calls to an operator partition (cumulative)",
["instance", "operator", "partition"],
)
self.operator_latency_gauge = Gauge(
"operator_latency_ms",
"Average operator call latency in ms for this epoch",
["instance", "operator", "partition"],
)

# Used for annotations in the grafana dashboard
self.migration_start_time_gauge = Gauge("migration_start_time_ms", "Timestamp when the migration started", [])
self.migration_end_time_gauge = Gauge("migration_end_time_ms", "Timestamp when the migration completed", [])

# Phase-attributed resource metrics (aggregated per epoch in the worker, scraped at coordinator).
self.phase_cpu_ms_total = Counter(
"phase_cpu_ms_total",
"Process CPU time attributed to a transactional protocol phase (ms, cumulative)",
["instance", "phase"],
)
self.phase_net_rx_bytes_total = Counter(
"phase_net_rx_bytes_total",
"Network RX bytes attributed to a transactional protocol phase (bytes, cumulative)",
["instance", "phase"],
)
self.phase_net_tx_bytes_total = Counter(
"phase_net_tx_bytes_total",
"Network TX bytes attributed to a transactional protocol phase (bytes, cumulative)",
["instance", "phase"],
)
self.phase_rss_max_mb = Gauge(
"phase_rss_max_mb",
"Max RSS observed during a transactional protocol phase within the last reported epoch (MB)",
["instance", "phase"],
)

self.migration_in_progress: bool = False

self.networking_locks: dict[MessageType, asyncio.Lock] = {
Expand Down Expand Up @@ -284,6 +357,8 @@ async def _start_migration(self, graph: StateflowGraph) -> None:
await self.stop_snapshotting()

logging.warning(f"MIGRATION | START {graph}")
start_time = time.time_ns()
self.migration_start_time_gauge.set(start_time / 1_000_000)
await self.coordinator.update_stateflow_graph(graph)

n_workers = len(self.coordinator.worker_pool.get_participating_workers())
Expand Down Expand Up @@ -377,6 +452,7 @@ async def _handle_register_worker(
logging.warning(
f"Worker registered {worker_ip}:{worker_port} with id {worker_id}",
)
self.live_worker_count_gauge.set(len(self.coordinator.worker_pool.get_live_workers()))

async def _track_reregistered_worker(self, worker_id: int) -> None:
async with self.recovery_lock:
Expand Down Expand Up @@ -565,9 +641,21 @@ async def _handle_sync_cleanup(self, data: bytes) -> None:
commit_time,
fallback_time,
snap_time,
input_rate,
queue_backlog,
total_txns,
committed_txns,
logic_aborts,
concurrency_aborts,
committed_lock_free,
committed_fallback,
cpu_utilization,
io_wait_utilization,
operator_epoch_stats,
phase_resources,
) = self.protocol_networking.decode_message(data)

self._record_epoch_metrics(
worker_epoch_stats = WorkerEpochStats(
worker_id=worker_id,
epoch_throughput=epoch_throughput,
epoch_latency=epoch_latency,
Expand All @@ -580,6 +668,22 @@ async def _handle_sync_cleanup(self, data: bytes) -> None:
commit_time=commit_time,
fallback_time=fallback_time,
snap_time=snap_time,
input_rate=input_rate,
queue_backlog=queue_backlog,
total_txns=total_txns,
committed_txns=committed_txns,
logic_aborts=logic_aborts,
concurrency_aborts=concurrency_aborts,
committed_lock_free=committed_lock_free,
committed_fallback=committed_fallback,
cpu_utilization=cpu_utilization,
io_wait_utilization=io_wait_utilization,
operator_epoch_stats=operator_epoch_stats,
phase_resources=phase_resources,
)

self._record_epoch_metrics(
worker_epoch_stats,
)

sync_complete: bool = self.aria_metadata.set_empty_sync_done(worker_id)
Expand All @@ -595,32 +699,66 @@ async def _handle_sync_cleanup(self, data: bytes) -> None:

def _record_epoch_metrics(
self,
*,
worker_id: str,
epoch_throughput: float,
epoch_latency: float,
local_abort_rate: float,
wal_time: float,
func_time: float,
chain_ack_time: float,
sync_time: float,
conflict_res_time: float,
commit_time: float,
fallback_time: float,
snap_time: float,
worker_epoch_stats: WorkerEpochStats,
) -> None:
self.epoch_throughput_gauge.labels(instance=worker_id).set(epoch_throughput)
self.epoch_latency_gauge.labels(instance=worker_id).set(epoch_latency)
self.epoch_abort_gauge.labels(instance=worker_id).set(local_abort_rate)

self.latency_breakdown_gauge.labels(instance=worker_id, component="WAL").set(wal_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="1st Run").set(func_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Chain Acks").set(chain_ack_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="SYNC").set(sync_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Conflict Resolution").set(conflict_res_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Commit time").set(commit_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Fallback").set(fallback_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Async Snapshot").set(snap_time)
worker_id = worker_epoch_stats.worker_id

self.epoch_throughput_gauge.labels(instance=worker_id).set(worker_epoch_stats.epoch_throughput)
self.epoch_latency_gauge.labels(instance=worker_id).set(worker_epoch_stats.epoch_latency)
self.epoch_abort_gauge.labels(instance=worker_id).set(worker_epoch_stats.local_abort_rate)

self.latency_breakdown_gauge.labels(instance=worker_id, component="WAL").set(worker_epoch_stats.wal_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="1st Run").set(worker_epoch_stats.func_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Chain Acks").set(
worker_epoch_stats.chain_ack_time
)
self.latency_breakdown_gauge.labels(instance=worker_id, component="SYNC").set(worker_epoch_stats.sync_time)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Conflict Resolution").set(
worker_epoch_stats.conflict_res_time
)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Commit time").set(
worker_epoch_stats.commit_time
)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Fallback").set(
worker_epoch_stats.fallback_time
)
self.latency_breakdown_gauge.labels(instance=worker_id, component="Async Snapshot").set(
worker_epoch_stats.snap_time
)

self.input_rate_counter.labels(instance=worker_id).inc(worker_epoch_stats.input_rate)
self.queue_backlog_gauge.labels(instance=worker_id).set(worker_epoch_stats.queue_backlog)

# Transaction count metrics
self.epoch_total_txns_counter.labels(instance=worker_id).inc(worker_epoch_stats.total_txns)
self.epoch_committed_txns_counter.labels(instance=worker_id).inc(worker_epoch_stats.committed_txns)
self.epoch_logic_aborts_counter.labels(instance=worker_id).inc(worker_epoch_stats.logic_aborts)
self.epoch_concurrency_aborts_counter.labels(instance=worker_id).inc(worker_epoch_stats.concurrency_aborts)
self.epoch_committed_lock_free_counter.labels(instance=worker_id).inc(worker_epoch_stats.committed_lock_free)
self.epoch_committed_fallback_counter.labels(instance=worker_id).inc(worker_epoch_stats.committed_fallback)

self.cpu_utilization_ratio_gauge.labels(instance=worker_id).set(worker_epoch_stats.cpu_utilization)
self.io_utilization_ratio_gauge.labels(instance=worker_id).set(worker_epoch_stats.io_wait_utilization)

# Operator-level metrics for this worker and epoch
for op_name, partition, tps, avg_latency_ms, call_count in worker_epoch_stats.operator_epoch_stats:
labels = {"instance": worker_id, "operator": op_name, "partition": str(partition)}
self.operator_tps_counter.labels(**labels).inc(tps)
self.operator_call_count_counter.labels(**labels).inc(call_count)
self.operator_latency_gauge.labels(**labels).set(avg_latency_ms)

for phase, v in worker_epoch_stats.phase_resources.get("cpu_ns", {}).items():
self.phase_cpu_ms_total.labels(instance=worker_id, phase=phase).inc(float(v) / 1e6)
logging.warning(f" Phase {phase} CPU: {float(v) / 1e6} ms")
for phase, v in worker_epoch_stats.phase_resources.get("rx_bytes", {}).items():
self.phase_net_rx_bytes_total.labels(instance=worker_id, phase=phase).inc(float(v))
logging.warning(f" Phase {phase} RX: {float(v)} bytes")
for phase, v in worker_epoch_stats.phase_resources.get("tx_bytes", {}).items():
self.phase_net_tx_bytes_total.labels(instance=worker_id, phase=phase).inc(float(v))
logging.warning(f" Phase {phase} TX: {float(v)} bytes")
for phase, v in worker_epoch_stats.phase_resources.get("rss_max_bytes", {}).items():
self.phase_rss_max_mb.labels(instance=worker_id, phase=phase).set(float(v) / (1024 * 1024))
logging.warning(f" Phase {phase} RSS Max: {float(v) / (1024 * 1024)} MB")

async def _handle_deterministic_reordering(self, data: bytes) -> None:
mt = MessageType.DeterministicReordering
Expand Down Expand Up @@ -662,8 +800,10 @@ async def _handle_migration_done(self, _: bytes) -> None:
if not sync_complete:
return

end_time = time.time_ns()
self.migration_end_time_gauge.set(end_time / 1_000_000)
logging.warning(
f"MIGRATION_FINISHED at time: {time.time_ns() // 1_000_000}",
f"MIGRATION_FINISHED at time: {end_time // 1_000_000}",
)
await self.migration_metadata.cleanup(mt)
self.migration_in_progress = False
Expand Down
9 changes: 9 additions & 0 deletions coordinator/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def __init__(self) -> None:
self.operator_partition_to_worker: dict[OperatorPartition, int] = {}
self.orphaned_operator_assignments: dict[OperatorPartition, Operator] = {}

def get_live_workers(self) -> list[Worker]:
"""Returns all currently known (non-tombstoned) workers"""
live: list[Worker] = []
for _, _, worker in self._queue:
if worker == self._tombstone:
continue
live.append(worker)
return live

def register_worker(
self,
worker_ip: str,
Expand Down
19 changes: 15 additions & 4 deletions demo/demo-deathstar-hotel-reservation/pure_kafka_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
from styx.common.local_state_backends import LocalStateBackend
from styx.common.stateflow_graph import StateflowGraph

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from load_generator import LoadSchedule

SAVE_DIR: str = sys.argv[1]
threads = int(sys.argv[2])
barrier = multiprocessing.Barrier(threads)
Expand All @@ -40,7 +43,8 @@
STYX_HOST: str = os.getenv("STYX_HOST", "localhost")
STYX_PORT: int = int(os.getenv("STYX_PORT", "8886"))
KAFKA_URL: str = os.getenv("KAFKA_URL", "localhost:9092")
kill_at = int(sys.argv[7]) if len(sys.argv) > 7 else -1
load_config_path = sys.argv[7]
kill_at = int(sys.argv[8]) if len(sys.argv) > 8 else -1

g = StateflowGraph("deathstar_hotel_reservations",
operator_state_backend=LocalStateBackend.DICT,
Expand All @@ -65,6 +69,12 @@
user_operator,
)

load_schedule = LoadSchedule.from_config_file(
load_config_path,
target_tps=messages_per_second,
time=seconds
)


def styx_hash(styx: SyncStyxClient, key, op) -> int:
# IMPORTANT: op is the Operator object; Styx knows how to map it.
Expand Down Expand Up @@ -284,8 +294,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]:
subprocess.run(["docker", "kill", "styx-worker-1"], check=False)
print("KILL -> styx-worker-1 done")
sec_start = timer()
for i in range(messages_per_second):
if i % (messages_per_second // sleeps_per_second) == 0:
current_tps = load_schedule.get_tps(second)
for i in range(current_tps):
if i % (current_tps // sleeps_per_second) == 0:
time.sleep(sleep_time)
operator, key, func_name, params = next(deathstar_generator)
future = styx.send_event(
Expand All @@ -301,7 +312,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]:
if lps < 1:
time.sleep(1 - lps)
sec_end2 = timer()
print(f"Latency per second: {sec_end2 - sec_start}")
print(f"{second} | TPS: {current_tps} | Latency: {sec_end2 - sec_start:.3f}s")
end = timer()
print(f"Average latency per second: {(end - start) / seconds}")
styx.close()
Expand Down
19 changes: 15 additions & 4 deletions demo/demo-deathstar-movie-review/pure_kafka_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from styx.client import SyncStyxClient
from styx.common.local_state_backends import LocalStateBackend
from styx.common.stateflow_graph import StateflowGraph

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from load_generator import LoadSchedule
from workload_data import charset, movie_titles

SAVE_DIR: str = sys.argv[1]
Expand All @@ -45,7 +48,8 @@
STYX_HOST: str = os.getenv("STYX_HOST", "localhost")
STYX_PORT: int = int(os.getenv("STYX_PORT", "8886"))
KAFKA_URL: str = os.getenv("KAFKA_URL", "localhost:9092")
kill_at = int(sys.argv[7]) if len(sys.argv) > 7 else -1
load_config_path: str = sys.argv[7]
kill_at = int(sys.argv[8]) if len(sys.argv) > 8 else -1

g = StateflowGraph("deathstar_movie_review",
operator_state_backend=LocalStateBackend.DICT,
Expand All @@ -72,6 +76,12 @@
frontend_operator
)

load_schedule = LoadSchedule.from_config_file(
load_config_path,
target_tps=messages_per_second,
time=seconds
)


# -------------------------------------------------------------------------------------
# init_data helpers
Expand Down Expand Up @@ -211,8 +221,9 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]:
subprocess.run(["docker", "kill", "styx-worker-1"], check=False)
print("KILL -> styx-worker-1 done")
sec_start = timer()
for i in range(messages_per_second):
if i % (messages_per_second // sleeps_per_second) == 0:
current_tps = load_schedule.get_tps(second)
for i in range(current_tps):
if i % (current_tps // sleeps_per_second) == 0:
time.sleep(sleep_time)
operator, key, func_name, params = next(deathstar_generator)
future = styx.send_event(operator=operator,
Expand All @@ -226,7 +237,7 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]:
if lps < 1:
time.sleep(1 - lps)
sec_end2 = timer()
print(f"Latency per second: {sec_end2 - sec_start}")
print(f"{second} | TPS: {current_tps} | Latency: {sec_end2 - sec_start:.3f}s")
end = timer()
print(f"Average latency per second: {(end - start) / seconds}")
styx.close()
Expand Down
Loading
Loading