Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions proto/controls/service/data_cache/data_cache.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
syntax = "proto3";

package services.data_cache;

service DataCacheService {
// Create a topic with partitions and replication factor
rpc CreateMQ(CreateMQRequest) returns (CreateMQResponse);

// Create many topics with identical partitions and replication factor
rpc CreateNMQs(CreateMQsRequest) returns (CreateMQResponse);

// Send a single message to a topic
rpc PushMQ(PushMQRequest) returns (ProducersResponse);

// Send a single message to many topic
rpc PushMQs(PushMQsRequest) returns (ProducersResponse);

// Get message from a topic
rpc PullMQ(PullMQRequest) returns (ConsumersResponse);

// Get messages from many topics
rpc PullMQs(PullMQsRequest) returns (ConsumersResponse);
}

message CreateMQRequest {
string mq_name = 1; // topic name
int32 partitions = 2; // Each topic can be devided into multiple partitions
int32 replication_factor = 3; // replication for high availibility
}

message CreateMQsRequest {
string mq_names = 1; // topic names separed with ',' e.g. topic1, topic2, topic3
int32 partitions = 2; // Each topic can be devided into multiple partitions
int32 replication_factor = 3; // replication for high availibility
}

message CreateMQResponse {
bool success = 1; // success or failure
string message = 2; // response (error message if failure)
}

message PushMQRequest {
string mq_name = 1; // topic name
string key = 2; // optional
bytes value = 3; // message
}

message PushMQsRequest {
string mq_names = 1; // list of topics

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated here, so clients can provide a list of names.

string key = 2; // optional
bytes value = 3; // message
}

message ProducersResponse {
bool success = 1; // success or failure
string messages_with_offsets = 2; // response offset / error message if failure
}

message PullMQRequest {
string mq_name = 1; // topic name
int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout)
}

message PullMQsRequest {
string mq_names = 1; // list of topics
int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout)
}
Comment on lines +59 to +67

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to have an option to specify a starting offset. If the connection to the gRPC server drops, I don't want to get all the messages again when I reconnect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I'm seeing that the ConsumersResponse object has the partitions and offsets, and those are a little confusing to me. Is the intention that each message will correspond to an index in the lists? So index 1 in the keys field will line up with index 1 in the values, offsets, and partitions fields?

If the above is true, may I suggest a much clearer option? Use this structure instead:

message DataMessage { // Or some other name that makes sense to you, doesn't need to be DataMessage necessarily
    string key = 1;
    bytes value = 2;
    uint64 offset = 3;
    uint32 partition = 4;
}
message ConsumersResponse {
    repeated DataMessage messages = 1;
}

@jacob-curley-fnal jacob-curley-fnal Feb 5, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now I have a further question: are clients expected to keep track of the partitions that messages come from? If the network drops my connection to the server, am I going to have to specify the latest offset for all the partitions I know about, if I want to avoid pulling down messages I've already seen? And is this exposing too much of Kafka's guts to the clients? As a client, I'd like to see topics as a singular stream, and not need to care about partitions.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I want to keep it generalized so that when/if kafka will get replaced then again we don't need to replace the PullMQRequest. However, I like the idea of offset. Because irrespective of any MQ (Redis, Kafka and RabbitMQ) offset can be generalized and get the message with offset from the stream. Let me update the code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this gets to the heart of the problem. I think we're hiding Kafka at too high of a level.

By hiding it inside a singleton service like this, we have N clients using 1 Kafka connection, with new clients joining the stream at arbitrary times. The service now needs to track which client has seen what messages, or the client has to track offsets (and then the service will need to map the client's offset to the order in which messages came back from the different partitions; partition 0 and partition 1 can both have a message at offset 5, but they'll be different messages). This is a boatload of complexity being distributed into the control system, just to avoid using Kafka's out-of-the-box, performant, enterprise-scale solution (groups)!

If, instead, we hide Kafka inside a library that each client depends on, and each client is therefore responsible for its own connection to Kafka, then we can use Kafka's group mechanism to let it handle reconnecting from the correct position for us. The business logic of the clients still won't know they're talking to Kafka, as that detail is hidden in the library. We can now have our cake and eat it too. Yes, this does add a little more cost to converting to a different platform than Kafka, but we'd just update the details in the library and push it to all the dependent applications. Automated tools like Dependabot will make this very easy.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me be clear once again, we are not going to support native calls to the any central service (including Kafka).


message ConsumersResponse {
string keys = 1; // associated key or keys

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated here, so clients can parse the list of keys

bytes values = 2; // response

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated here, so clients can parse the list of messages.

string offsets = 3; // offset or many offsets

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated int32 or repeated int64 would be better here

string partitions = 4; // partition or many partitions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated for multiple objects returned

}