Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/protocol/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_subdirectory(memcache)
add_subdirectory(ping)
add_subdirectory(resp)
add_subdirectory(resp_tw)
9 changes: 9 additions & 0 deletions src/protocol/data/resp_tw/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

set(SOURCE
token.c
request.c
response.c
compose.c
parse.c)

add_library(protocol_resp_tw ${SOURCE})
8 changes: 8 additions & 0 deletions src/protocol/data/resp_tw/cmd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include <stdint.h>

/* Common macros for defining a command */

/* Allow unlimited optional parameters */
#define OPT_UNLIMITED UINT32_MAX
41 changes: 41 additions & 0 deletions src/protocol/data/resp_tw/cmd_bitmap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

/**
* create: create a bitmap of certain size, all bits reset unless value given
* BitMap.create KEY size
* Note: if size is not a multiple of the internal allocation unit (e.g. byte),
* it will be rounded up internally
* TODO: how to transfer value w/o being misrepresented due to endianness?
* until we figure that out we shouldn't allow user to initialize w/ value
*
* delete: delete a bitmap
* BitMap.delete KEY
*
* get: get value of a column in a bitmap
* BitMap.get KEY columnId
*
* set: set value of a column in a bitmap
* BitMap.set KEY columnId val
*/

/* TODO:
* - variable-width columns. this PR will implement only 1-bit columns
* - metadata: this will allow simple customization such as softTTL, timestamp
* or other info. This is the same idea behind memcached's `flag' field, but
* it's better to make it optional instead of allocating a fixed sized region
* for all commands and all data types.
*/

/* type string #arg #opt */
#define REQ_BITMAP(ACTION) \
ACTION( REQ_BITMAP_CREATE, "BitMap.create", 3, 0 )\
ACTION( REQ_BITMAP_DELETE, "BitMap.delete", 2, 0 )\
ACTION( REQ_BITMAP_GET, "BitMap.get", 3, 0 )\
ACTION( REQ_BITMAP_SET, "BitMap.set", 4, 0 )

typedef enum bitmap_elem {
BITMAP_VERB = 0,
BITMAP_KEY = 1,
BITMAP_COL = 2,
BITMAP_VAL = 3,
} bitmap_elem_e;
24 changes: 24 additions & 0 deletions src/protocol/data/resp_tw/cmd_hash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "cmd.h"

/* type string # of args */
#define REQ_HASH(ACTION) \
ACTION( REQ_HDEL, "hdel", 3, OPT_UNLIMITED )\
ACTION( REQ_HDELALL, "hdelall", 2, 0 )\
ACTION( REQ_HEXISTS, "hexists", 3, 0 )\
ACTION( REQ_HGET, "hget", 3, 0 )\
ACTION( REQ_HGETALL, "hgetall", 2, 0 )\
ACTION( REQ_HINCRBY, "hincrby", 4, 0 )\
ACTION( REQ_HINCRBYFLOAT, "hincrbyfloat", 4, 0 )\
ACTION( REQ_HKEYS, "hkeys", 2, 0 )\
ACTION( REQ_HLEN, "hlen", 2, 0 )\
ACTION( REQ_HMGET, "hmget", 3, OPT_UNLIMITED )\
ACTION( REQ_HMSET, "hmset", 4, OPT_UNLIMITED )\
ACTION( REQ_HSET, "hset", 4, 0 )\
ACTION( REQ_HSETNX, "hsetnx", 4, 0 )\
ACTION( REQ_HSTRLEN, "hstrlen", 3, 0 )\
ACTION( REQ_HVALS, "hvals", 2, 0 )\
ACTION( REQ_HSCAN, "hscan", 3, 0 )

/* "hlen KEY" == "*2\r\n$4\r\nhlen\r\n$3\r\nKEY\r\n" */
50 changes: 50 additions & 0 deletions src/protocol/data/resp_tw/cmd_list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include "cmd.h"

/**
* create: create an empty list
* List.create KEY
*
* delete: delete a list or a particular value from list
* List.delete KEY [VALUE [COUNT]]
*
* trim: trimming a list
* List.trim KEY INDEX COUNT
*
* len: return number of entries in list
* List.len KEY
*
* find: find entry in list
* List.find KEY VALUE
*
* get: get entry/entries at an index
* List.get KEY [INDEX [COUNT]]
*
* insert: insert entry at an index
* List.insert KEY VALUE INDEX
*
* push: pushing entry/entries at the end
* List.push KEY VALUE [VALUE ...]
*/


/* type string #arg #opt */
#define REQ_LIST(ACTION) \
ACTION( REQ_LIST_CREATE, "List.create", 2, 0 )\
ACTION( REQ_LIST_DELETE, "List.delete", 2, 2 )\
ACTION( REQ_LIST_LEN, "List.len", 2, 0 )\
ACTION( REQ_LIST_FIND, "List.find", 3, 0 )\
ACTION( REQ_LIST_GET, "List.get", 2, 2 )\
ACTION( REQ_LIST_INSERT, "List.insert", 4, 0 )\
ACTION( REQ_LIST_PUSH, "List.push", 3, OPT_UNLIMITED )\
ACTION( REQ_LIST_TRIM, "List.trim", 4, 0 )

typedef enum list_elem {
LIST_VERB = 0,
LIST_KEY = 1,
LIST_VAL = 2,
LIST_IDX = 2,
LIST_VIDX = 3, /* when a value is also present */
LIST_CNT = 3,
} list_elem_e;
7 changes: 7 additions & 0 deletions src/protocol/data/resp_tw/cmd_misc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once

/* type string #arg#opt */
#define REQ_MISC(ACTION) \
ACTION( REQ_FLUSHALL, "flushall", 1, 0 )\
ACTION( REQ_PING, "ping", 1, 1 )\
ACTION( REQ_QUIT, "quit", 1, 0 )
54 changes: 54 additions & 0 deletions src/protocol/data/resp_tw/cmd_sarray.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include "cmd.h"

/**
* create: create an empty array or integer width ESIZE
* SArray.create KEY ESIZE [WATERMARK_L] [WATERMARK_H]
*
* delete: delete an array
* SArray.delete KEY
*
* len: return number of entries in array
* SArray.len KEY
*
* find: find (rank of an value) in array
* SArray.find KEY VALUE
*
* get: get entry/entries at an index
* SArray.get KEY [INDEX [COUNT]]
*
* insert: insert value
* SArray.insert KEY VALUE [VALUE ...]
*
* remove: remove a particular value from array
* SArray.remove KEY VALUE [VALUE ...]
*
* truncate: truncate a array
* SArray.truncate KEY COUNT
*
*/


/* type string #arg #opt */
#define REQ_SARRAY(ACTION) \
ACTION( REQ_SARRAY_CREATE, "SArray.create", 3, 2 )\
ACTION( REQ_SARRAY_DELETE, "SArray.delete", 2, 0 )\
ACTION( REQ_SARRAY_LEN, "SArray.len", 2, 0 )\
ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\
ACTION( REQ_SARRAY_GET, "SArray.get", 2, 2 )\
ACTION( REQ_SARRAY_INSERT, "SArray.insert", 3, OPT_UNLIMITED )\
ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, OPT_UNLIMITED )\
ACTION( REQ_SARRAY_TRUNCATE, "SArray.truncate", 3, 0 )

typedef enum sarray_elem {
SARRAY_VERB = 0,
SARRAY_KEY = 1,
SARRAY_ESIZE = 2,
SARRAY_VAL = 2,
SARRAY_IDX = 2,
SARRAY_CNT = 2,
SARRAY_ICNT = 3, /* when an index is also present */
SARRAY_WML = 3, /* watermark (low) */
SARRAY_WMH = 4, /* watermark (high) */
} sarray_elem_e;
27 changes: 27 additions & 0 deletions src/protocol/data/resp_tw/cmd_zset.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "cmd.h"

/* type string # of args */
#define REQ_ZSET(ACTION) \
ACTION( REQ_ZADD, "zadd", 4, OPT_UNLIMITED )\
ACTION( REQ_ZINCRBY, "zincrby", 4, 0 )\
ACTION( REQ_ZREM, "zrem", 3, OPT_UNLIMITED )\
ACTION( REQ_ZREMRANGEBYSCORE, "zremrangebyscore", 4, 0 )\
ACTION( REQ_ZREMRANGEBYRANK, "zremrangebyrank", 4, 0 )\
ACTION( REQ_ZREMRANGEBYLEX, "zremrangebylex", 4, 0 )\
ACTION( REQ_ZUNIONSTORE, "zunionstore", 4, OPT_UNLIMITED )\
ACTION( REQ_ZINTERSTORE, "zinterstore", 4, OPT_UNLIMITED )\
ACTION( REQ_ZRANGE, "zrange", 4, OPT_UNLIMITED )\
ACTION( REQ_ZRANGEBYSCORE, "zrangebyscore", 4, OPT_UNLIMITED )\
ACTION( REQ_ZREVRANGEBYSCORE, "zrevrangebyscore", 4, OPT_UNLIMITED )\
ACTION( REQ_ZRANGEBYLEX, "zrangebylex", 4, OPT_UNLIMITED )\
ACTION( REQ_ZREVRANGEBYLEX, "zrevrangebylex", 4, OPT_UNLIMITED )\
ACTION( REQ_ZCOUNT, "zcount", 4, 0 )\
ACTION( REQ_ZLEXCOUNT, "zlexcount", 4, 0 )\
ACTION( REQ_ZREVRANGE, "zrevrange", 4, OPT_UNLIMITED )\
ACTION( REQ_ZCARD, "zcard", 2, 0 )\
ACTION( REQ_ZSCORE, "zscore", 3, 0 )\
ACTION( REQ_ZRANK, "zrank", 3, 0 )\
ACTION( REQ_ZREVRANK, "zrevrank", 3, 0 )\
ACTION( REQ_ZSCAN, "zscan", 3, OPT_UNLIMITED )
157 changes: 157 additions & 0 deletions src/protocol/data/resp_tw/compose.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#include "compose.h"

#include "request.h"
#include "response.h"
#include "token.h"

#include <cc_debug.h>
#include <cc_print.h>

#define COMPOSE_MODULE_NAME "protocol::resp_tw::compose"

static bool compose_init = false;
static compose_req_metrics_st *compose_req_metrics = NULL;
static compose_rsp_metrics_st *compose_rsp_metrics = NULL;

void
compose_setup(compose_req_metrics_st *req, compose_rsp_metrics_st *rsp)
{
log_info("set up the %s module", COMPOSE_MODULE_NAME);

if (compose_init) {
log_warn("%s has already been set up, overwrite", COMPOSE_MODULE_NAME);
}

compose_req_metrics = req;
compose_rsp_metrics = rsp;

compose_init = true;
}

void
compose_teardown(void)
{
log_info("tear down the %s module", COMPOSE_MODULE_NAME);

if (!compose_init) {
log_warn("%s has never been set up", COMPOSE_MODULE_NAME);
}

compose_req_metrics = NULL;
compose_rsp_metrics = NULL;
compose_init = false;
}

int
compose_req(struct buf **buf, struct request *req)
{
int n = compose_array_header(buf, req->token->nelem);
if (n < 0) {
return n;
}

for (size_t i = 0; i < req->token->nelem; i++) {
int ret = compose_element(buf, array_get(req->token, i));
if (ret < 0) {
return ret;
}
n += ret;
}

return n;
}

static inline bool
is_aggregate(element_type_e type) {
switch (type) {
case ELEM_ARRAY:
case ELEM_ATTRIBUTES:
case ELEM_MAP:
case ELEM_SET:
case ELEM_PUSH_DATA:
return true;
default:
return false;
}
}

static inline int
compose_aggregate_header(struct buf **buf, struct response *rsp)
{
switch (rsp->type) {
case ELEM_ARRAY:
return compose_array_header(buf, rsp->token->nelem);
case ELEM_MAP:
return compose_map_header(buf, rsp->token->nelem);
case ELEM_SET:
return compose_set_header(buf, rsp->token->nelem);
case ELEM_PUSH_DATA:
return compose_push_data_header(buf, rsp->token->nelem);
case ELEM_ATTRIBUTES:
/* Attributes must be before another value */
return COMPOSE_ENOTSUPPORTED;
default:
NOT_REACHED();
return COMPOSE_EOTHER;
}
}

static inline int
compose_attrs(struct buf **buf, struct array *attrs)
{
int n = compose_attribute_header(buf, attrs->nelem);
if (n < 0) {
return n;
}

for (size_t i = 0; i < attrs->nelem; ++i) {
struct attribute_entry *entry = array_get(attrs, i);

int ret = compose_element(buf, &entry->key);
if (ret < 0) {
return ret;
}
n += ret;

ret = compose_element(buf, &entry->val);
if (ret < 0) {
return ret;
}
n += ret;
}

return n;
}

int
compose_rsp(struct buf **buf, struct response *rsp)
{
int n = 0;

if (rsp->attrs->nelem > 0) {
int ret = compose_attrs(buf, rsp->attrs);
if (ret < 0) {
return ret;
}
n += ret;
}

if (is_aggregate(rsp->type)) {
int ret = compose_aggregate_header(buf, rsp);
if (ret < 0) {
return ret;
}
n += ret;
}

for (size_t i = 0; i < rsp->token->nelem; ++i) {
int ret = compose_element(buf, array_get(rsp->token, i));
if (ret < 0) {
return ret;
}

n += ret;
}

return n;
}
Loading