From 79d3db3208a9de13d4f4909a46c7d151393a773b Mon Sep 17 00:00:00 2001 From: amol Date: Mon, 17 Nov 2025 12:52:22 -0600 Subject: [PATCH 1/4] Added gRPC interface definition for data cache --- proto/controls/service/data-cache/cache.proto | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 proto/controls/service/data-cache/cache.proto diff --git a/proto/controls/service/data-cache/cache.proto b/proto/controls/service/data-cache/cache.proto new file mode 100644 index 0000000..bc0e47e --- /dev/null +++ b/proto/controls/service/data-cache/cache.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package services.cache; + +service KafkaService { + // Create a topic with partitions and replication factor + rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); + + // Send a single message to a topic + rpc Produce(ProduceRequest) returns (ProduceResponse); + + // Stream messages from a topic (server streaming) + rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); +} + +message CreateTopicRequest { + string topic = 1; + int32 partitions = 2; + int16 replication_factor = 3; +} + +message CreateTopicResponse { + bool success = 1; + string message = 2; +} + +message ProduceRequest { + string topic = 1; + string key = 2; // optional + bytes value = 3; +} + +message ProduceResponse { + bool success = 1; + string message = 2; + int64 offset = 3; +} + +message ConsumeRequest { + string topic = 1; + string group_id = 2; + int32 timeout_seconds = 3; // optional, stop after secs (0 = no timeout) +} + +message ConsumeResponse { + string key = 1; + bytes value = 2; + int64 offset = 3; + int32 partition = 4; +} + From 87b194e49794b4a713756e581749eacfbfdfe101 Mon Sep 17 00:00:00 2001 From: amol Date: Mon, 17 Nov 2025 13:24:11 -0600 Subject: [PATCH 2/4] Changed from int16 to int32 --- proto/controls/service/data-cache/cache.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/controls/service/data-cache/cache.proto b/proto/controls/service/data-cache/cache.proto index bc0e47e..5d739ba 100644 --- a/proto/controls/service/data-cache/cache.proto +++ b/proto/controls/service/data-cache/cache.proto @@ -16,7 +16,7 @@ service KafkaService { message CreateTopicRequest { string topic = 1; int32 partitions = 2; - int16 replication_factor = 3; + int32 replication_factor = 3; } message CreateTopicResponse { From 441cc13ffd9056bf6e8fa84a1c2e7bc387c76ec3 Mon Sep 17 00:00:00 2001 From: amol Date: Wed, 19 Nov 2025 09:14:19 -0600 Subject: [PATCH 3/4] Added capbility to talk to multiple topics with single gRPC call --- proto/controls/service/data-cache/cache.proto | 73 +++++++++++++------ 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/proto/controls/service/data-cache/cache.proto b/proto/controls/service/data-cache/cache.proto index 5d739ba..233db3c 100644 --- a/proto/controls/service/data-cache/cache.proto +++ b/proto/controls/service/data-cache/cache.proto @@ -6,46 +6,73 @@ service KafkaService { // Create a topic with partitions and replication factor rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); + // Create many topics with identical partitions and replication factor + rpc CreateNtopics(CreateNtopicsRequest) returns (CreateTopicResponse); + // Send a single message to a topic - rpc Produce(ProduceRequest) returns (ProduceResponse); + rpc ProduceTopic(ProduceTopicRequest) returns (ProduceResponse); + + // Send a single message to many topic + rpc ProduceNtopics(ProduceNtopicsRequest) returns (ProduceResponse); + + // Get message from a topic + rpc ConsumeTopic(ConsumeTopicRequest) returns (ConsumeResponse); - // Stream messages from a topic (server streaming) - rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); + // Get messages from many topics + rpc ConsumeNtopics(ConsumeNtopicsRequest) returns (ConsumeResponse); } message CreateTopicRequest { - string topic = 1; - int32 partitions = 2; - int32 replication_factor = 3; + string topic = 1; // topic name + int32 partitions = 2; // Each topic can be devided into multiple partitions + int32 replication_factor = 3; // replication for high availibility +} + +message CreateNtopicsRequest { + string topics = 1; // topic names separed with ',' e.g. topic1, topic2, topic3 + int32 partitions = 2; // Each topic can be devided into multiple partitions + int32 replication_factor = 3; // replication for high availibility } message CreateTopicResponse { - bool success = 1; - string message = 2; + bool success = 1; // success or failure + string message = 2; // response (error message if failure) } -message ProduceRequest { - string topic = 1; - string key = 2; // optional - bytes value = 3; +message ProduceTopicRequest { + string topic = 1; // topic name + string key = 2; // optional + bytes value = 3; // message +} + +message ProduceNtopicsRequest { + string topics = 1; // list of topics + string key = 2; // optional + bytes value = 3; // message } message ProduceResponse { - bool success = 1; - string message = 2; - int64 offset = 3; + bool success = 1; // success or failure + string message = 2; // response offset / error message if failure + //int64 offset = 3; // offset --> return the offset for that topic +} + +message ConsumeTopicRequest { + string topic = 1; // topic name + //string group_id = 2; + int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout) } -message ConsumeRequest { - string topic = 1; - string group_id = 2; - int32 timeout_seconds = 3; // optional, stop after secs (0 = no timeout) +message ConsumeNtopicsRequest { + string topics = 1; // list of topics + //string group_id = 2; + int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout) } message ConsumeResponse { - string key = 1; - bytes value = 2; - int64 offset = 3; - int32 partition = 4; + string keys = 1; // associated key or keys + bytes values = 2; // response + string offsets = 3; // offset or many offsets + string partitions = 4; // partition or many partitions } From 7145c9e6f735353ff1ec9ef332dc34c6299c8837 Mon Sep 17 00:00:00 2001 From: amolfnal Date: Mon, 19 Jan 2026 00:31:22 -0600 Subject: [PATCH 4/4] Updating and adding additional functionality --- .../data_cache.proto} | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) rename proto/controls/service/{data-cache/cache.proto => data_cache/data_cache.proto} (53%) diff --git a/proto/controls/service/data-cache/cache.proto b/proto/controls/service/data_cache/data_cache.proto similarity index 53% rename from proto/controls/service/data-cache/cache.proto rename to proto/controls/service/data_cache/data_cache.proto index 233db3c..7cf6eb4 100644 --- a/proto/controls/service/data-cache/cache.proto +++ b/proto/controls/service/data_cache/data_cache.proto @@ -1,75 +1,72 @@ syntax = "proto3"; -package services.cache; +package services.data_cache; -service KafkaService { +service DataCacheService { // Create a topic with partitions and replication factor - rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); + rpc CreateMQ(CreateMQRequest) returns (CreateMQResponse); // Create many topics with identical partitions and replication factor - rpc CreateNtopics(CreateNtopicsRequest) returns (CreateTopicResponse); + rpc CreateNMQs(CreateMQsRequest) returns (CreateMQResponse); // Send a single message to a topic - rpc ProduceTopic(ProduceTopicRequest) returns (ProduceResponse); + rpc PushMQ(PushMQRequest) returns (ProducersResponse); // Send a single message to many topic - rpc ProduceNtopics(ProduceNtopicsRequest) returns (ProduceResponse); + rpc PushMQs(PushMQsRequest) returns (ProducersResponse); // Get message from a topic - rpc ConsumeTopic(ConsumeTopicRequest) returns (ConsumeResponse); + rpc PullMQ(PullMQRequest) returns (ConsumersResponse); // Get messages from many topics - rpc ConsumeNtopics(ConsumeNtopicsRequest) returns (ConsumeResponse); + rpc PullMQs(PullMQsRequest) returns (ConsumersResponse); } -message CreateTopicRequest { - string topic = 1; // topic name +message CreateMQRequest { + string mq_name = 1; // topic name int32 partitions = 2; // Each topic can be devided into multiple partitions int32 replication_factor = 3; // replication for high availibility } -message CreateNtopicsRequest { - string topics = 1; // topic names separed with ',' e.g. topic1, topic2, topic3 +message CreateMQsRequest { + string mq_names = 1; // topic names separed with ',' e.g. topic1, topic2, topic3 int32 partitions = 2; // Each topic can be devided into multiple partitions int32 replication_factor = 3; // replication for high availibility } -message CreateTopicResponse { +message CreateMQResponse { bool success = 1; // success or failure string message = 2; // response (error message if failure) } -message ProduceTopicRequest { - string topic = 1; // topic name +message PushMQRequest { + string mq_name = 1; // topic name string key = 2; // optional bytes value = 3; // message } -message ProduceNtopicsRequest { - string topics = 1; // list of topics +message PushMQsRequest { + string mq_names = 1; // list of topics string key = 2; // optional bytes value = 3; // message } -message ProduceResponse { +message ProducersResponse { bool success = 1; // success or failure - string message = 2; // response offset / error message if failure - //int64 offset = 3; // offset --> return the offset for that topic + string messages_with_offsets = 2; // response offset / error message if failure } -message ConsumeTopicRequest { - string topic = 1; // topic name - //string group_id = 2; +message PullMQRequest { + string mq_name = 1; // topic name int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout) } -message ConsumeNtopicsRequest { - string topics = 1; // list of topics - //string group_id = 2; +message PullMQsRequest { + string mq_names = 1; // list of topics int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout) } -message ConsumeResponse { +message ConsumersResponse { string keys = 1; // associated key or keys bytes values = 2; // response string offsets = 3; // offset or many offsets