diff --git a/src/protocol/data/CMakeLists.txt b/src/protocol/data/CMakeLists.txt index af72a52b1..3b698ace7 100644 --- a/src/protocol/data/CMakeLists.txt +++ b/src/protocol/data/CMakeLists.txt @@ -1,3 +1,4 @@ add_subdirectory(memcache) add_subdirectory(ping) add_subdirectory(resp) +add_subdirectory(resp_tw) diff --git a/src/protocol/data/resp_tw/CMakeLists.txt b/src/protocol/data/resp_tw/CMakeLists.txt new file mode 100644 index 000000000..c30aa3173 --- /dev/null +++ b/src/protocol/data/resp_tw/CMakeLists.txt @@ -0,0 +1,9 @@ + +set(SOURCE + token.c + request.c + response.c + compose.c + parse.c) + +add_library(protocol_resp_tw ${SOURCE}) diff --git a/src/protocol/data/resp_tw/cmd.h b/src/protocol/data/resp_tw/cmd.h new file mode 100644 index 000000000..2eaf21013 --- /dev/null +++ b/src/protocol/data/resp_tw/cmd.h @@ -0,0 +1,8 @@ +#pragma once + +#include + +/* Common macros for defining a command */ + +/* Allow unlimited optional parameters */ +#define OPT_UNLIMITED UINT32_MAX diff --git a/src/protocol/data/resp_tw/cmd_bitmap.h b/src/protocol/data/resp_tw/cmd_bitmap.h new file mode 100644 index 000000000..36c343c1c --- /dev/null +++ b/src/protocol/data/resp_tw/cmd_bitmap.h @@ -0,0 +1,41 @@ +#pragma once + +/** + * create: create a bitmap of certain size, all bits reset unless value given + * BitMap.create KEY size + * Note: if size is not a multiple of the internal allocation unit (e.g. byte), + * it will be rounded up internally + * TODO: how to transfer value w/o being misrepresented due to endianness? + * until we figure that out we shouldn't allow user to initialize w/ value + * + * delete: delete a bitmap + * BitMap.delete KEY + * + * get: get value of a column in a bitmap + * BitMap.get KEY columnId + * + * set: set value of a column in a bitmap + * BitMap.set KEY columnId val + */ + +/* TODO: + * - variable-width columns. this PR will implement only 1-bit columns + * - metadata: this will allow simple customization such as softTTL, timestamp + * or other info. This is the same idea behind memcached's `flag' field, but + * it's better to make it optional instead of allocating a fixed sized region + * for all commands and all data types. + */ + +/* type string #arg #opt */ +#define REQ_BITMAP(ACTION) \ + ACTION( REQ_BITMAP_CREATE, "BitMap.create", 3, 0 )\ + ACTION( REQ_BITMAP_DELETE, "BitMap.delete", 2, 0 )\ + ACTION( REQ_BITMAP_GET, "BitMap.get", 3, 0 )\ + ACTION( REQ_BITMAP_SET, "BitMap.set", 4, 0 ) + +typedef enum bitmap_elem { + BITMAP_VERB = 0, + BITMAP_KEY = 1, + BITMAP_COL = 2, + BITMAP_VAL = 3, +} bitmap_elem_e; diff --git a/src/protocol/data/resp_tw/cmd_hash.h b/src/protocol/data/resp_tw/cmd_hash.h new file mode 100644 index 000000000..122f19e39 --- /dev/null +++ b/src/protocol/data/resp_tw/cmd_hash.h @@ -0,0 +1,24 @@ +#pragma once + +#include "cmd.h" + +/* type string # of args */ +#define REQ_HASH(ACTION) \ + ACTION( REQ_HDEL, "hdel", 3, OPT_UNLIMITED )\ + ACTION( REQ_HDELALL, "hdelall", 2, 0 )\ + ACTION( REQ_HEXISTS, "hexists", 3, 0 )\ + ACTION( REQ_HGET, "hget", 3, 0 )\ + ACTION( REQ_HGETALL, "hgetall", 2, 0 )\ + ACTION( REQ_HINCRBY, "hincrby", 4, 0 )\ + ACTION( REQ_HINCRBYFLOAT, "hincrbyfloat", 4, 0 )\ + ACTION( REQ_HKEYS, "hkeys", 2, 0 )\ + ACTION( REQ_HLEN, "hlen", 2, 0 )\ + ACTION( REQ_HMGET, "hmget", 3, OPT_UNLIMITED )\ + ACTION( REQ_HMSET, "hmset", 4, OPT_UNLIMITED )\ + ACTION( REQ_HSET, "hset", 4, 0 )\ + ACTION( REQ_HSETNX, "hsetnx", 4, 0 )\ + ACTION( REQ_HSTRLEN, "hstrlen", 3, 0 )\ + ACTION( REQ_HVALS, "hvals", 2, 0 )\ + ACTION( REQ_HSCAN, "hscan", 3, 0 ) + +/* "hlen KEY" == "*2\r\n$4\r\nhlen\r\n$3\r\nKEY\r\n" */ diff --git a/src/protocol/data/resp_tw/cmd_list.h b/src/protocol/data/resp_tw/cmd_list.h new file mode 100644 index 000000000..7341cf4c7 --- /dev/null +++ b/src/protocol/data/resp_tw/cmd_list.h @@ -0,0 +1,50 @@ +#pragma once + +#include "cmd.h" + +/** + * create: create an empty list + * List.create KEY + * + * delete: delete a list or a particular value from list + * List.delete KEY [VALUE [COUNT]] + * + * trim: trimming a list + * List.trim KEY INDEX COUNT + * + * len: return number of entries in list + * List.len KEY + * + * find: find entry in list + * List.find KEY VALUE + * + * get: get entry/entries at an index + * List.get KEY [INDEX [COUNT]] + * + * insert: insert entry at an index + * List.insert KEY VALUE INDEX + * + * push: pushing entry/entries at the end + * List.push KEY VALUE [VALUE ...] + */ + + +/* type string #arg #opt */ +#define REQ_LIST(ACTION) \ + ACTION( REQ_LIST_CREATE, "List.create", 2, 0 )\ + ACTION( REQ_LIST_DELETE, "List.delete", 2, 2 )\ + ACTION( REQ_LIST_LEN, "List.len", 2, 0 )\ + ACTION( REQ_LIST_FIND, "List.find", 3, 0 )\ + ACTION( REQ_LIST_GET, "List.get", 2, 2 )\ + ACTION( REQ_LIST_INSERT, "List.insert", 4, 0 )\ + ACTION( REQ_LIST_PUSH, "List.push", 3, OPT_UNLIMITED )\ + ACTION( REQ_LIST_TRIM, "List.trim", 4, 0 ) + +typedef enum list_elem { + LIST_VERB = 0, + LIST_KEY = 1, + LIST_VAL = 2, + LIST_IDX = 2, + LIST_VIDX = 3, /* when a value is also present */ + LIST_CNT = 3, +} list_elem_e; diff --git a/src/protocol/data/resp_tw/cmd_misc.h b/src/protocol/data/resp_tw/cmd_misc.h new file mode 100644 index 000000000..1bff28c1b --- /dev/null +++ b/src/protocol/data/resp_tw/cmd_misc.h @@ -0,0 +1,7 @@ +#pragma once + +/* type string #arg#opt */ +#define REQ_MISC(ACTION) \ + ACTION( REQ_FLUSHALL, "flushall", 1, 0 )\ + ACTION( REQ_PING, "ping", 1, 1 )\ + ACTION( REQ_QUIT, "quit", 1, 0 ) diff --git a/src/protocol/data/resp_tw/cmd_sarray.h b/src/protocol/data/resp_tw/cmd_sarray.h new file mode 100644 index 000000000..4f301ceee --- /dev/null +++ b/src/protocol/data/resp_tw/cmd_sarray.h @@ -0,0 +1,54 @@ +#pragma once + +#include "cmd.h" + +/** + * create: create an empty array or integer width ESIZE + * SArray.create KEY ESIZE [WATERMARK_L] [WATERMARK_H] + * + * delete: delete an array + * SArray.delete KEY + * + * len: return number of entries in array + * SArray.len KEY + * + * find: find (rank of an value) in array + * SArray.find KEY VALUE + * + * get: get entry/entries at an index + * SArray.get KEY [INDEX [COUNT]] + * + * insert: insert value + * SArray.insert KEY VALUE [VALUE ...] + * + * remove: remove a particular value from array + * SArray.remove KEY VALUE [VALUE ...] + * + * truncate: truncate a array + * SArray.truncate KEY COUNT + * + */ + + +/* type string #arg #opt */ +#define REQ_SARRAY(ACTION) \ + ACTION( REQ_SARRAY_CREATE, "SArray.create", 3, 2 )\ + ACTION( REQ_SARRAY_DELETE, "SArray.delete", 2, 0 )\ + ACTION( REQ_SARRAY_LEN, "SArray.len", 2, 0 )\ + ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\ + ACTION( REQ_SARRAY_GET, "SArray.get", 2, 2 )\ + ACTION( REQ_SARRAY_INSERT, "SArray.insert", 3, OPT_UNLIMITED )\ + ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, OPT_UNLIMITED )\ + ACTION( REQ_SARRAY_TRUNCATE, "SArray.truncate", 3, 0 ) + +typedef enum sarray_elem { + SARRAY_VERB = 0, + SARRAY_KEY = 1, + SARRAY_ESIZE = 2, + SARRAY_VAL = 2, + SARRAY_IDX = 2, + SARRAY_CNT = 2, + SARRAY_ICNT = 3, /* when an index is also present */ + SARRAY_WML = 3, /* watermark (low) */ + SARRAY_WMH = 4, /* watermark (high) */ +} sarray_elem_e; diff --git a/src/protocol/data/resp_tw/cmd_zset.h b/src/protocol/data/resp_tw/cmd_zset.h new file mode 100644 index 000000000..ed58e3648 --- /dev/null +++ b/src/protocol/data/resp_tw/cmd_zset.h @@ -0,0 +1,27 @@ +#pragma once + +#include "cmd.h" + +/* type string # of args */ +#define REQ_ZSET(ACTION) \ + ACTION( REQ_ZADD, "zadd", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZINCRBY, "zincrby", 4, 0 )\ + ACTION( REQ_ZREM, "zrem", 3, OPT_UNLIMITED )\ + ACTION( REQ_ZREMRANGEBYSCORE, "zremrangebyscore", 4, 0 )\ + ACTION( REQ_ZREMRANGEBYRANK, "zremrangebyrank", 4, 0 )\ + ACTION( REQ_ZREMRANGEBYLEX, "zremrangebylex", 4, 0 )\ + ACTION( REQ_ZUNIONSTORE, "zunionstore", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZINTERSTORE, "zinterstore", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZRANGE, "zrange", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZRANGEBYSCORE, "zrangebyscore", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZREVRANGEBYSCORE, "zrevrangebyscore", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZRANGEBYLEX, "zrangebylex", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZREVRANGEBYLEX, "zrevrangebylex", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZCOUNT, "zcount", 4, 0 )\ + ACTION( REQ_ZLEXCOUNT, "zlexcount", 4, 0 )\ + ACTION( REQ_ZREVRANGE, "zrevrange", 4, OPT_UNLIMITED )\ + ACTION( REQ_ZCARD, "zcard", 2, 0 )\ + ACTION( REQ_ZSCORE, "zscore", 3, 0 )\ + ACTION( REQ_ZRANK, "zrank", 3, 0 )\ + ACTION( REQ_ZREVRANK, "zrevrank", 3, 0 )\ + ACTION( REQ_ZSCAN, "zscan", 3, OPT_UNLIMITED ) diff --git a/src/protocol/data/resp_tw/compose.c b/src/protocol/data/resp_tw/compose.c new file mode 100644 index 000000000..78e2b8c85 --- /dev/null +++ b/src/protocol/data/resp_tw/compose.c @@ -0,0 +1,157 @@ +#include "compose.h" + +#include "request.h" +#include "response.h" +#include "token.h" + +#include +#include + +#define COMPOSE_MODULE_NAME "protocol::resp_tw::compose" + +static bool compose_init = false; +static compose_req_metrics_st *compose_req_metrics = NULL; +static compose_rsp_metrics_st *compose_rsp_metrics = NULL; + +void +compose_setup(compose_req_metrics_st *req, compose_rsp_metrics_st *rsp) +{ + log_info("set up the %s module", COMPOSE_MODULE_NAME); + + if (compose_init) { + log_warn("%s has already been set up, overwrite", COMPOSE_MODULE_NAME); + } + + compose_req_metrics = req; + compose_rsp_metrics = rsp; + + compose_init = true; +} + +void +compose_teardown(void) +{ + log_info("tear down the %s module", COMPOSE_MODULE_NAME); + + if (!compose_init) { + log_warn("%s has never been set up", COMPOSE_MODULE_NAME); + } + + compose_req_metrics = NULL; + compose_rsp_metrics = NULL; + compose_init = false; +} + +int +compose_req(struct buf **buf, struct request *req) +{ + int n = compose_array_header(buf, req->token->nelem); + if (n < 0) { + return n; + } + + for (size_t i = 0; i < req->token->nelem; i++) { + int ret = compose_element(buf, array_get(req->token, i)); + if (ret < 0) { + return ret; + } + n += ret; + } + + return n; +} + +static inline bool +is_aggregate(element_type_e type) { + switch (type) { + case ELEM_ARRAY: + case ELEM_ATTRIBUTES: + case ELEM_MAP: + case ELEM_SET: + case ELEM_PUSH_DATA: + return true; + default: + return false; + } +} + +static inline int +compose_aggregate_header(struct buf **buf, struct response *rsp) +{ + switch (rsp->type) { + case ELEM_ARRAY: + return compose_array_header(buf, rsp->token->nelem); + case ELEM_MAP: + return compose_map_header(buf, rsp->token->nelem); + case ELEM_SET: + return compose_set_header(buf, rsp->token->nelem); + case ELEM_PUSH_DATA: + return compose_push_data_header(buf, rsp->token->nelem); + case ELEM_ATTRIBUTES: + /* Attributes must be before another value */ + return COMPOSE_ENOTSUPPORTED; + default: + NOT_REACHED(); + return COMPOSE_EOTHER; + } +} + +static inline int +compose_attrs(struct buf **buf, struct array *attrs) +{ + int n = compose_attribute_header(buf, attrs->nelem); + if (n < 0) { + return n; + } + + for (size_t i = 0; i < attrs->nelem; ++i) { + struct attribute_entry *entry = array_get(attrs, i); + + int ret = compose_element(buf, &entry->key); + if (ret < 0) { + return ret; + } + n += ret; + + ret = compose_element(buf, &entry->val); + if (ret < 0) { + return ret; + } + n += ret; + } + + return n; +} + +int +compose_rsp(struct buf **buf, struct response *rsp) +{ + int n = 0; + + if (rsp->attrs->nelem > 0) { + int ret = compose_attrs(buf, rsp->attrs); + if (ret < 0) { + return ret; + } + n += ret; + } + + if (is_aggregate(rsp->type)) { + int ret = compose_aggregate_header(buf, rsp); + if (ret < 0) { + return ret; + } + n += ret; + } + + for (size_t i = 0; i < rsp->token->nelem; ++i) { + int ret = compose_element(buf, array_get(rsp->token, i)); + if (ret < 0) { + return ret; + } + + n += ret; + } + + return n; +} \ No newline at end of file diff --git a/src/protocol/data/resp_tw/compose.h b/src/protocol/data/resp_tw/compose.h new file mode 100644 index 000000000..c71e206da --- /dev/null +++ b/src/protocol/data/resp_tw/compose.h @@ -0,0 +1,39 @@ +#pragma once + +#include "token.h" + +#include +#include + +#include + +/* name Type description */ +#define COMPOSE_REQ_METRIC(ACTION) \ + ACTION( request_compose, METRIC_COUNTER, "# requests composed" )\ + ACTION( request_compose_ex, METRIC_COUNTER, "# composing error" ) + +/* name Type description */ +#define COMPOSE_RSP_METRIC(ACTION) \ + ACTION( response_compose, METRIC_COUNTER, "# responses composed" )\ + ACTION( response_compose_ex, METRIC_COUNTER, "# rsp composing error") + +typedef struct { + COMPOSE_REQ_METRIC(METRIC_DECLARE) +} compose_req_metrics_st; + +typedef struct { + COMPOSE_RSP_METRIC(METRIC_DECLARE) +} compose_rsp_metrics_st; + + +struct request; +struct response; + +void compose_setup(compose_req_metrics_st *req, compose_rsp_metrics_st *rsp); +void compose_teardown(void); + +/* returns the number of bytes written out to buf. + * if the return value is negative, it can be interpreted as compose_rstatus + */ +int compose_req(struct buf **buf, struct request *req); +int compose_rsp(struct buf **buf, struct response *rsp); diff --git a/src/protocol/data/resp_tw/parse.c b/src/protocol/data/resp_tw/parse.c new file mode 100644 index 000000000..5bd0d4fd9 --- /dev/null +++ b/src/protocol/data/resp_tw/parse.c @@ -0,0 +1,279 @@ +#include "parse.h" + +#include "request.h" +#include "response.h" +#include "token.h" + +#include +#include +#include +#include + +#include + +#define PARSE_MODULE_NAME "protocol::resp::parse" + +static bool parse_init = false; +static parse_req_metrics_st *parse_req_metrics = NULL; +static parse_rsp_metrics_st *parse_rsp_metrics = NULL; + +void +parse_setup(parse_req_metrics_st *req, parse_rsp_metrics_st *rsp) +{ + log_info("set up the %s module", PARSE_MODULE_NAME); + + if (parse_init) { + log_warn("%s has already been setup, overwrite", PARSE_MODULE_NAME); + } + + parse_req_metrics = req; + parse_rsp_metrics = rsp; + parse_init = true; +} + +void +parse_teardown(void) +{ + log_info("tear down the %s module", PARSE_MODULE_NAME); + + if (!parse_init) { + log_warn("%s has never been setup", PARSE_MODULE_NAME); + } + + parse_req_metrics = NULL; + parse_rsp_metrics = NULL; + parse_init = false; +} + +static parse_rstatus_e +_parse_cmd(struct request *req) +{ + cmd_type_e type; + struct command cmd; + struct element *el; + int narg; + + ASSERT(req != NULL); + + /* check verb */ + type = REQ_UNKNOWN; + el = array_first(req->token); + + ASSERT (el->type == ELEM_BLOB_STR); + while (++type < REQ_SENTINEL && + bstring_compare(&command_table[type].bstr, &el->bstr) != 0) {} + if (type == REQ_SENTINEL) { + log_warn("unrecognized command detected: %.*s", el->bstr.len, + el->bstr.data); + return PARSE_EINVALID; + } + req->type = type; + + /* check narg */ + cmd = command_table[type]; + narg = req->token->nelem; + if (narg < cmd.narg || narg > (cmd.narg + cmd.nopt)) { + log_warn("wrong # of arguments for '%.*s': %d+[%d] expected, %d given", + cmd.bstr.len, cmd.bstr.data, cmd.narg, cmd.nopt, narg); + return PARSE_EINVALID; + } + + return PARSE_OK; +} + + +parse_rstatus_e +parse_req(struct request *req, struct buf *buf) +{ + parse_rstatus_e status = PARSE_OK; + char *old_rpos = buf->rpos; + uint64_t nelem; + struct element *el; + + log_verb("parsing buf %p into req %p", buf, req); + + if (buf_rsize(buf) == 0) { + return PARSE_EUNFIN; + } + + /* get number of elements in the array */ + if (!token_is_array(buf)) { + log_debug("parse req failed: not an array"); + return PARSE_EINVALID; + } + status = token_array_nelem(&nelem, buf); + if (status != PARSE_OK) { + log_verb("getting array size returns status %d", status); + buf->rpos = old_rpos; + return status; + } else { + log_verb("array size is %"PRId64, nelem); + } + + if (nelem < 1 || nelem > req->token->nalloc) { + log_debug("parse req: invalid array size, %d not in [1, %"PRIu32"]", + nelem, req->token->nalloc); + return PARSE_EINVALID; + } + + + /* parse elements */ + while (nelem > 0) { + if (buf_rsize(buf) == 0) { + buf->rpos = old_rpos; + return PARSE_EUNFIN; + } + el = array_push(req->token); + status = parse_element(el, buf); + log_verb("parse element returned status %d", status); + if (status != PARSE_OK) { + request_reset(req); + buf->rpos = old_rpos; + return status; + } + nelem--; + } + + status = _parse_cmd(req); + log_verb("parse command returned status %d", status); + if (status != PARSE_OK) { + buf->rpos = old_rpos; + return status; + } + + return PARSE_OK; +} + +static inline parse_rstatus_e +parse_attrs(struct array *attrs, struct buf *buf) +{ + uint64_t nelem; + + parse_rstatus_e status = token_attribute_nelem(&nelem, buf); + if (status != PARSE_OK) { + return status; + } + if (nelem > attrs->nalloc) { + log_warn("attributes map contained %" PRIu64 " > %" PRIu32 " elements", nelem, + attrs->nalloc); + return PARSE_EOVERSIZE; + } + + log_verb("parsing attributes with %" PRIu64 " key-value pair%s", nelem, + nelem == 1 ? "" : "s"); + + for (uint64_t i = 0; i < nelem; ++i) { + struct attribute_entry entry; + + status = parse_element(&entry.key, buf); + if (status != PARSE_OK) { + return status; + } + + status = parse_element(&entry.val, buf); + if (status != PARSE_OK) { + return status; + } + + void *place = array_push(attrs); + memcpy(place, &entry, sizeof(entry)); + } + + return PARSE_OK; +} + +static inline parse_rstatus_e +token_is_aggregate(struct buf* buf) { + return token_is_array(buf) + || token_is_map(buf) + || token_is_set(buf) + || token_is_push_data(buf); +} + +static inline parse_rstatus_e +parse_aggregate_nelem(uint64_t *nelem, struct buf *buf, element_type_e *type) +{ + /* Note: don't allow attributes since an attribute by itself isn't valid */ + + if (token_is_array(buf)) { + *type = ELEM_ARRAY; + log_verb("parsing array header"); + return token_array_nelem(nelem, buf); + } + if (token_is_map(buf)) { + *type = ELEM_MAP; + log_verb("parsing map header"); + return token_map_nelem(nelem, buf); + } + if (token_is_set(buf)) { + *type = ELEM_SET; + log_verb("parsing set header"); + return token_set_nelem(nelem, buf); + } + if (token_is_push_data(buf)) { + *type = ELEM_PUSH_DATA; + log_verb("parsing push data header"); + return token_push_data_nelem(nelem, buf); + } + + return PARSE_EINVALID; +} + +parse_rstatus_e +parse_rsp(struct response *rsp, struct buf *buf) +{ + char *old_rpos = buf->rpos; + parse_rstatus_e status; + + response_reset(rsp); + + if (buf_rsize(buf) == 0) { + return PARSE_EUNFIN; + } + + if (token_is_attribute(buf)) { + status = parse_attrs(rsp->attrs, buf); + if (status != PARSE_OK) { + buf->rpos = old_rpos; + return status; + } + } else { + /* no attributes is represented by an empty attrs array */ + } + + uint64_t nelem = 1; + if (token_is_aggregate(buf)) { + status = parse_aggregate_nelem(&nelem, buf, &rsp->type); + if (status != PARSE_OK) { + buf->rpos = old_rpos; + return status; + } + + if (nelem > rsp->token->nalloc) { + log_debug("parse rsp: invalid # of eleents, %d > %" PRIu32, nelem, rsp->token->nalloc); + buf->rpos = old_rpos; + return status; + } + + log_verb("parsing aggregate structure with %" PRIu64 " elements", nelem); + } + + /* parse elements */ + for (uint64_t i = 0; i < nelem; ++i) { + struct element *el = array_push(rsp->token); + status = parse_element(el, buf); + + if (status != PARSE_OK) { + log_verb("parse element returned status %d", status); + response_reset(rsp); + buf->rpos = old_rpos; + return status; + } + + if (rsp->type == ELEM_UNKNOWN) { + rsp->type = el->type; + } + } + + return PARSE_OK; +} diff --git a/src/protocol/data/resp_tw/parse.h b/src/protocol/data/resp_tw/parse.h new file mode 100644 index 000000000..720020fa2 --- /dev/null +++ b/src/protocol/data/resp_tw/parse.h @@ -0,0 +1,46 @@ +#pragma once + +#include "request.h" +#include "response.h" +#include "token.h" + +#include +#include + +#include + +/* Note(yao): the prefix cmd_ is mostly to be compatible with Twemcache metric + * names. + * On the other hand, the choice of putting request in front of parse instead of + * the other way around in `request_parse' is to allow users to easily query all + * metrics related to requests , similar for responses. + */ +/* name type description */ +#define PARSE_REQ_METRIC(ACTION) \ + ACTION( request_parse, METRIC_COUNTER, "# requests parsed" )\ + ACTION( request_parse_ex, METRIC_COUNTER, "# parsing error" ) + +/* name type description */ +#define PARSE_RSP_METRIC(ACTION) \ + ACTION( response_parse, METRIC_COUNTER, "# responses parsed" )\ + ACTION( response_parse_ex, METRIC_COUNTER, "# rsp parsing error" )\ + +typedef struct { + PARSE_REQ_METRIC(METRIC_DECLARE) +} parse_req_metrics_st; + +typedef struct { + PARSE_RSP_METRIC(METRIC_DECLARE) +} parse_rsp_metrics_st; + +void parse_setup(parse_req_metrics_st *req, parse_rsp_metrics_st *rsp); +void parse_teardown(void); + +static inline bool +key_valid(struct bstring *key) +{ + return (key->len > 0 && key->len <= KEY_MAXLEN); +} + +parse_rstatus_e parse_req(struct request *req, struct buf *buf); +parse_rstatus_e parse_rsp(struct response *rsp, struct buf *buf); diff --git a/src/protocol/data/resp_tw/request.c b/src/protocol/data/resp_tw/request.c new file mode 100644 index 000000000..31cefdc4d --- /dev/null +++ b/src/protocol/data/resp_tw/request.c @@ -0,0 +1,229 @@ +#include "request.h" + +#include "token.h" + +#include +#include +#include + +#define REQUEST_MODULE_NAME "protocol::resp-tw::request" + +static bool request_init = false; +static request_metrics_st *request_metrics = NULL; + +#define CMD_INIT(_type, _str, _narg, _nopt) {\ + .type = _type, \ + .bstr = { sizeof(_str) - 1, (_str) }, \ + .narg = _narg, \ + .nopt = _nopt }, +struct command command_table[REQ_SENTINEL] = { + { .type = REQ_UNKNOWN, .bstr = { 0, NULL }, .narg = 0, .nopt = 0 }, + REQ_BITMAP(CMD_INIT) + REQ_HASH(CMD_INIT) + REQ_LIST(CMD_INIT) + REQ_SARRAY(CMD_INIT) + REQ_ZSET(CMD_INIT) + REQ_MISC(CMD_INIT) +}; +#undef CMD_INIT + +static size_t ntoken = REQ_NTOKEN; +FREEPOOL(req_pool, reqq, request); +static struct req_pool reqp; +static bool reqp_init = false; + +void +request_reset(struct request *req) +{ + ASSERT(req != NULL); + + STAILQ_NEXT(req, next) = NULL; + req->free = false; + + req->noreply = 0; + req->serror = 0; + req->cerror = 0; + + req->type = REQ_UNKNOWN; + req->token->nelem = 0; +} + +struct request * +request_create(void) +{ + rstatus_i status; + struct request *req = cc_alloc(sizeof(struct request)); + + if (req == NULL) { + return NULL; + } + + status = array_create(&req->token, ntoken, sizeof(struct element)); + if (status != CC_OK) { + cc_free(req); + return NULL; + } + request_reset(req); + + INCR(request_metrics, request_create); + INCR(request_metrics, request_curr); + + return req; +} + +static struct request * +_request_create(void) +{ + struct request *req = request_create(); + + if (req != NULL) { + INCR(request_metrics, request_free); + } + + return req; +} + +void +request_destroy(struct request **request) +{ + struct request *req = *request; + ASSERT(req != NULL); + + INCR(request_metrics, request_destroy); + DECR(request_metrics, request_curr); + + array_destroy(&req->token); + cc_free(req); + + *request = NULL; +} + +static void +_request_destroy(struct request **request) +{ + request_destroy(request); + DECR(request_metrics, request_free); +} + +static void +request_pool_destroy(void) +{ + struct request *req, *treq; + + if (!reqp_init) { + log_warn("request pool was never created, ignore"); + } + + log_info("destroying request pool: free %" PRIu32, reqp.nfree); + + FREEPOOL_DESTROY(req, treq, &reqp, next, _request_destroy); + reqp_init = false; +} + +static void +request_pool_create(uint32_t max) +{ + struct request *req; + + if (reqp_init) { + log_warn("request pool has already been created, re-creating"); + + request_pool_destroy(); + } + + log_info("creating request pool: max %" PRIu32, max); + + FREEPOOL_CREATE(&reqp, max); + reqp_init = true; + + FREEPOOL_PREALLOC(req, &reqp, max, next, _request_create); + if (reqp.nfree < max) { + log_crit("cannot preallocate request pool, OOM. abort"); + exit(EXIT_FAILURE); + } +} + +struct request * +request_borrow(void) +{ + struct request *req; + + FREEPOOL_BORROW(req, &reqp, next, _request_create); + if (req == NULL) { + /* TODO(sean): What is the equivalent code in resp trying to do here? */ + log_debug("borrow req failed: OOM"); + + return NULL; + } + request_reset(req); + + DECR(request_metrics, request_free); + INCR(request_metrics, request_borrow); + log_vverb("borrowing request %p", req); + + return req; +} + +void +request_return(struct request **request) +{ + struct request *req = *request; + + if (req == NULL) { + return; + } + + INCR(request_metrics, request_free); + INCR(request_metrics, request_return); + log_vverb("return req %p"); + + req->free = true; + FREEPOOL_RETURN(req, &reqp, next); + + *request = NULL; +} + +void +request_setup(request_options_st *options, request_metrics_st *metrics) +{ + uint32_t max = REQ_POOLSIZE; + + log_info("set up the %s module", REQUEST_MODULE_NAME); + + if (request_init) { + log_warn("%s has already been setup, overwrite", REQUEST_MODULE_NAME); + } + + request_metrics = metrics; + + if (options != NULL) { + ntoken = option_uint(&options->request_ntoken); + + for (size_t i = 1; i < REQ_SENTINEL; ++i) { /* update nopt based on ntoken */ + if (command_table[i].nopt == OPT_UNLIMITED) { + command_table[i].nopt = ntoken - command_table[i].narg; + } + } + + max = option_uint(&options->request_poolsize); + } + + request_pool_create(max); + request_init = true; +} + +void +request_teardown(void) +{ + log_info("tear down the %s module", REQUEST_MODULE_NAME); + + if (!request_init) { + log_warn("%s has never been set up", REQUEST_MODULE_NAME); + } + + ntoken = REQ_NTOKEN; + request_pool_destroy(); + request_metrics = NULL; + + request_init = false; +} diff --git a/src/protocol/data/resp_tw/request.h b/src/protocol/data/resp_tw/request.h new file mode 100644 index 000000000..1f14d502d --- /dev/null +++ b/src/protocol/data/resp_tw/request.h @@ -0,0 +1,95 @@ +#pragma once + +#include "cmd_bitmap.h" +#include "cmd_hash.h" +#include "cmd_list.h" +#include "cmd_misc.h" +#include "cmd_sarray.h" +#include "cmd_zset.h" +#include "token.h" + +#include +#include +#include +#include +#include + +#include + +#define REQ_NTOKEN 127 /* # tokens in a command */ +#define KEY_MAXLEN 255 +#define REQ_POOLSIZE 0 + +/* name type default description */ +#define REQUEST_OPTION(ACTION) \ + ACTION( request_ntoken, OPTION_TYPE_UINT, REQ_NTOKEN, "# tokens in req" )\ + ACTION( request_poolsize, OPTION_TYPE_UINT, REQ_POOLSIZE, "request pool size") + +typedef struct { + REQUEST_OPTION(OPTION_DECLARE) +} request_options_st; + +/* name type description */ +#define REQUEST_METRIC(ACTION) \ + ACTION( request_curr, METRIC_GAUGE, "# req created" )\ + ACTION( request_free, METRIC_GAUGE, "# free req in pool" )\ + ACTION( request_borrow, METRIC_COUNTER, "# reqs borrowed" )\ + ACTION( request_return, METRIC_COUNTER, "# reqs returned" )\ + ACTION( request_create, METRIC_COUNTER, "# reqs created" )\ + ACTION( request_destroy, METRIC_COUNTER, "# reqs destroyed" ) + +typedef struct { + REQUEST_METRIC(METRIC_DECLARE) +} request_metrics_st; + +#define GET_TYPE(_type, _str, narg, nopt) _type, +typedef enum cmd_type { + REQ_UNKNOWN, + REQ_BITMAP(GET_TYPE) + REQ_HASH(GET_TYPE) + REQ_LIST(GET_TYPE) + REQ_SARRAY(GET_TYPE) + REQ_ZSET(GET_TYPE) + REQ_MISC(GET_TYPE) + REQ_SENTINEL +} cmd_type_e; +#undef GET_TYPE + +/* + * Note: though redis supports unbounded number of variables in some commands, + * implementation cannot operate with performance guarantee when this number + * gets too big. It also introduces uncertainty around resources. Therefore, we + * are limiting it to REQ_NTOKEN minus the # required args. For each command, if + * the # of optional arguments is declared as -1, (req_ntoken - narg) will be + * used to enforce argument limits. + */ +struct command { + cmd_type_e type; + struct bstring bstr; + uint32_t narg; /* number of required arguments, including verb */ + uint32_t nopt; /* number of optional arguments */ +}; + +extern struct command command_table[REQ_SENTINEL]; + +struct request { + STAILQ_ENTRY(request) next; /* allow request pooling/chaining */ + bool free; + + bool noreply; /* skip response */ + bool serror; /* server error */ + bool cerror; /* client error */ + + cmd_type_e type; + struct array *token; /* member type: `struct element' */ +}; + +void request_setup(request_options_st *options, request_metrics_st *metrics); +void request_teardown(void); + +struct request *request_create(void); +void request_destroy(struct request **req); +void request_reset(struct request *req); + +struct request *request_borrow(void); +void request_return(struct request **req); \ No newline at end of file diff --git a/src/protocol/data/resp_tw/response.c b/src/protocol/data/resp_tw/response.c new file mode 100644 index 000000000..4c244c4a3 --- /dev/null +++ b/src/protocol/data/resp_tw/response.c @@ -0,0 +1,219 @@ +#include "response.h" + +#include "token.h" + +#include +#include +#include + +#define RESPONSE_MODULE_NAME "protocol::resp-tw::response" + +static bool response_init = false; +static response_metrics_st *response_metrics = NULL; + +static size_t ntoken = RSP_NTOKEN; +FREEPOOL(rsp_pool, rspq, response); +static struct rsp_pool rspp; +static bool rspp_init = false; + +void +response_reset(struct response *rsp) +{ + ASSERT(rsp != NULL); + + STAILQ_NEXT(rsp, next) = NULL; + rsp->free = false; + + rsp->serror = false; + + rsp->type = ELEM_UNKNOWN; + rsp->token->nelem = 0; + rsp->attrs->nelem = 0; +} + +struct response * +response_create(void) +{ + rstatus_i status; + struct response *rsp = cc_alloc(sizeof(struct response)); + + if (rsp == NULL) { + return NULL; + } + + status = array_create(&rsp->token, ntoken, sizeof(struct element)); + if (status != CC_OK) { + cc_free(rsp); + return NULL; + } + + status = array_create(&rsp->attrs, ntoken/2, sizeof(struct attribute_entry)); + if (status != CC_OK) { + cc_free(rsp); + array_destroy(&rsp->token); + return NULL; + } + + response_reset(rsp); + + INCR(response_metrics, response_create); + INCR(response_metrics, response_curr); + + return rsp; +} + +static struct response * +_response_create(void) +{ + struct response *rsp = response_create(); + + if (rsp != NULL) { + INCR(response_metrics, response_free); + } + + return rsp; +} + +void +response_destroy(struct response **response) +{ + struct response *rsp = *response; + ASSERT(rsp != NULL); + + INCR(response_metrics, response_destroy); + DECR(response_metrics, response_curr); + array_destroy(&rsp->token); + array_destroy(&rsp->attrs); + cc_free(rsp); + *response = NULL; +} + +static void +_response_destroy(struct response **response) +{ + response_destroy(response); + DECR(response_metrics, response_free); +} + +static void +response_pool_destroy(void) +{ + struct response *rsp, *trsp; + + if (rspp_init) { + log_info("destroying response pool: free %"PRIu32, rspp.nfree); + + FREEPOOL_DESTROY(rsp, trsp, &rspp, next, _response_destroy); + rspp_init = false; + } else { + log_warn("response pool was never created, ignore"); + } +} + + +static void +response_pool_create(uint32_t max) +{ + struct response *rsp; + + if (rspp_init) { + log_warn("response pool has already been created, re-creating"); + + response_pool_destroy(); + } + + log_info("creating response pool: max %"PRIu32, max); + + FREEPOOL_CREATE(&rspp, max); + rspp_init = true; + + FREEPOOL_PREALLOC(rsp, &rspp, max, next, _response_create); + if (rspp.nfree < max) { + log_crit("cannot preallocate response pool, OOM. abort"); + exit(EXIT_FAILURE); + } +} + +struct response * +response_borrow(void) +{ + struct response *rsp; + + FREEPOOL_BORROW(rsp, &rspp, next, _response_create); + if (rsp == NULL) { + log_debug("borrow rsp failed: OOM %d"); + + return NULL; + } + response_reset(rsp); + + DECR(response_metrics, response_free); + INCR(response_metrics, response_borrow); + log_vverb("borrowing rsp %p", rsp); + + return rsp; +} + +/* + * Return a single response object + */ +void +response_return(struct response **response) +{ + ASSERT(response != NULL); + + struct response *rsp = *response; + + if (rsp == NULL) { + return; + } + + INCR(response_metrics, response_free); + INCR(response_metrics, response_return); + log_vverb("return rsp %p", rsp); + + rsp->free = true; + FREEPOOL_RETURN(rsp, &rspp, next); + + *response = NULL; +} + +void +response_setup(response_options_st *options, response_metrics_st *metrics) +{ + uint32_t max = RSP_POOLSIZE; + + log_info("set up the %s module", RESPONSE_MODULE_NAME); + + if (response_init) { + log_warn("%s has already been setup, overwrite", RESPONSE_MODULE_NAME); + } + + response_metrics = metrics; + + if (options != NULL) { + ntoken = option_uint(&options->response_ntoken); + max = option_uint(&options->response_poolsize); + } + + response_pool_create(max); + + response_init = true; +} + +void +response_teardown(void) +{ + log_info("tear down the %s module", RESPONSE_MODULE_NAME); + + if (!response_init) { + log_warn("%s has never been setup", RESPONSE_MODULE_NAME); + } + + ntoken = RSP_NTOKEN; + response_pool_destroy(); + response_metrics = NULL; + + response_init = false; +} + diff --git a/src/protocol/data/resp_tw/response.h b/src/protocol/data/resp_tw/response.h new file mode 100644 index 000000000..5f67c503c --- /dev/null +++ b/src/protocol/data/resp_tw/response.h @@ -0,0 +1,86 @@ +#pragma once + +#include "token.h" + +#include +#include +#include +#include +#include +#include +#include + +#define RSP_NTOKEN 127 /* # tokens in a response */ +#define RSP_POOLSIZE 0 + +/* name type default description */ +#define RESPONSE_OPTION(ACTION) \ + ACTION( response_ntoken, OPTION_TYPE_UINT, RSP_NTOKEN, "# tokens in response" )\ + ACTION( response_poolsize, OPTION_TYPE_UINT, RSP_POOLSIZE, "response pool size" ) + +typedef struct { + RESPONSE_OPTION(OPTION_DECLARE) +} response_options_st; + +/* name type description */ +#define RESPONSE_METRIC(ACTION) \ + ACTION( response_curr, METRIC_GAUGE, "# rsp created" )\ + ACTION( response_free, METRIC_GAUGE, "# free rsp in pool" )\ + ACTION( response_borrow, METRIC_COUNTER, "# rsps borrowed" )\ + ACTION( response_return, METRIC_COUNTER, "# rsps returned" )\ + ACTION( response_create, METRIC_COUNTER, "# rsps created" )\ + ACTION( response_destroy, METRIC_COUNTER, "# rsps destroyed" ) + +typedef struct { + RESPONSE_METRIC(METRIC_DECLARE) +} response_metrics_st; + +/** + * Note: there are some semi special values here: + * - a dummy entry RSP_UNKNOWN so we can use it as the initial type value; + * - a RSP_NUMERIC type that doesn't have a corresponding message body. + */ +#define RSP_OK "OK" +#define RSP_NOTFOUND "NOT_FOUND" +#define RSP_PONG "PONG" +#define RSP_EXIST "EXIST" /* key already exists and op is non-overwriting */ +#define RSP_NOOP "NOOP" /* key unmodified */ + +#define RSP_ERR_ARG "Err invalid argument" +#define RSP_ERR_NOSUPPORT "Err command not supported" +#define RSP_ERR_OUTOFRANGE "Err index out of range" +#define RSP_ERR_SERVER "Err unspecified server failure" +#define RSP_ERR_STORAGE "Err storage failure" +#define RSP_ERR_TYPE "Err type mismatch" + +/* + * NOTE(yao): we store fields as location in rbuf, this assumes the data will + * not be overwritten prematurely. + * Whether this is a reasonable design decision eventually remains to be seen. + */ + +struct attribute_entry { + struct element key; + struct element val; +}; + +struct response { + STAILQ_ENTRY(response) next; /* allow response pooling/chaining */ + bool free; + + bool serror; /* server error */ + + element_type_e type; /* anything that can have >1 token can be represented as an array */ + struct array *attrs; /* member type: `struct attribute_entry` */ + struct array *token; /* member type: `struct element' */ +}; + +void response_setup(response_options_st *options, response_metrics_st *metrics); +void response_teardown(void); + +struct response *response_create(void); +void response_destroy(struct response **rsp); +void response_reset(struct response *rsp); + +struct response *response_borrow(void); +void response_return(struct response **rsp); diff --git a/src/protocol/data/resp_tw/token.c b/src/protocol/data/resp_tw/token.c new file mode 100644 index 000000000..a34503f34 --- /dev/null +++ b/src/protocol/data/resp_tw/token.c @@ -0,0 +1,868 @@ +#include "token.h" + +#include "request.h" +#include "response.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#define STR_MAXLEN 255 /* max length for simple string or error */ +#define BULK_MAXLEN (512 * MiB) +#define ARRAY_MAXLEN (64 * MiB) +#define BIGNUM_MAXLEN STR_MAXLEN + +static inline compose_rstatus_e +_check_buf_size(struct buf **buf, uint32_t n) +{ + while (n > buf_wsize(*buf)) { + if (dbuf_double(buf) != CC_OK) { + log_debug("failed to write %u bytes to buf %p: insufficient " + "buffer space", n, *buf); + + return COMPOSE_ENOMEM; + } + } + + return COMPOSE_OK; +} + +static inline parse_rstatus_e +_try_match_inner(const char* match, size_t match_len, struct buf *buf) { + if (buf_rsize(buf) < match_len) { + return PARSE_EUNFIN; + } + + for (size_t i = 0; i < match_len; ++i) { + if (buf->rpos[i] != match[i]) { + return PARSE_EINVALID; + } + } + + buf->rpos += match_len; + + return PARSE_OK; +} + +/*================================================================= + * REDS3 Parsing Functions + *================================================================= + */ + +/* + * Attempts to match the given sequence of characters in the input. + * Does not match the final nul character. + * + * Note that match must resolve to a string literal. + * + * Return values: + * - PARSE_EINVALID if the string does not match + * - PARSE_EUNFIN if there is not enough buffer to match the string + * - PARSE_OK otherwise + * * This function updates buf->rpos only when it returns PARSE_OK. + */ +#define _try_match(match, buf) _try_match_inner((match), sizeof(match) - 1, buf) + +static parse_rstatus_e +_read_crlf(struct buf *buf) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + if (buf_rsize(buf) >= CRLF_LEN) { + if (!line_end(buf)) { + log_warn("invalid character encountered, expecting CRLF: %c%c", + buf->rpos[0], buf->rpos[1]); + + return PARSE_EINVALID; + } + + buf->rpos += CRLF_LEN; + return PARSE_OK; + } + + return PARSE_EUNFIN; +} + +static parse_rstatus_e +_read_str(struct bstring *str, struct buf *buf) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + str->len = 0; + str->data = buf->rpos; + /* + * Note: according to @antirez, simple strings are not supposed to be empty. + * However, there's no particular harm allowing a null simple string, so + * we allow it in this function + */ + for (; buf_rsize(buf) > 0; buf->rpos++) { + if (line_end(buf)) { + buf->rpos += CRLF_LEN; + log_vverb("simple string detected at %p, length %"PRIu32, buf->rpos, str->len); + + return PARSE_OK; + } + if (++str->len > STR_MAXLEN) { + log_warn("simple string max length (%d) exceeded", STR_MAXLEN); + + return PARSE_EOVERSIZE; + } + } + + return PARSE_EUNFIN; +} + +static parse_rstatus_e +_read_int(int64_t *num, struct buf *buf, int64_t min, int64_t max) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + size_t len = 0; + int64_t sign = 1; + + if (*buf->rpos == '-') { + sign = -1; + buf->rpos++; + } + + *num = 0; + for (; buf_rsize(buf) > 0; buf->rpos++) { + if (isdigit(*buf->rpos)) { + if (*num < min / 10 || *num > max / 10) { + /* TODO(yao): catch the few numbers that will still overflow */ + log_warn("ill formatted token: integer out of bounds"); + + return PARSE_EOVERSIZE; + } + + len++; + *num = *num * 10ULL + sign * (*buf->rpos - '0'); + } else { + if (len == 0 || *buf->rpos != CR) { + log_warn("invalid character encountered: %c", *buf->rpos); + + return PARSE_EINVALID; + } + if (line_end(buf)) { + if (*num < min || *num > max) { + log_warn("ill formatted token: integer out of bounds"); + + return PARSE_EOVERSIZE; + } + + buf->rpos += CRLF_LEN; + log_vverb("parsed integer, value %" PRIi64, *num); + + return PARSE_OK; + } else { + return PARSE_EUNFIN; + } + } + } + + return PARSE_EUNFIN; +} + +static parse_rstatus_e +_read_uint(uint64_t *num, struct buf *buf, uint64_t max) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + size_t len = 0; + + *num = 0; + for (; buf_rsize(buf) > 0; buf->rpos++) { + if (isdigit(*buf->rpos)) { + uint64_t digit = (*buf->rpos - '0'); + + if (*num > max / 10) { + log_warn("ill formatted token: integer out of bounds"); + + return PARSE_EOVERSIZE; + } + + len++; + *num = *num * 10ULL + digit; + } else { + if (len == 0 || *buf->rpos != CR) { + log_warn("invalid character encountered: %c", *buf->rpos); + + return PARSE_EINVALID; + } + if (line_end(buf)) { + if (*num > max) { + /* Note: This ensures that we are actually in bounds */ + log_warn("ill formatted token: integer out of bounds"); + + return PARSE_EOVERSIZE; + } + + buf->rpos += CRLF_LEN; + log_vverb("parsed integer, value %" PRIu64, *num); + + return PARSE_OK; + } + + return PARSE_EUNFIN; + } + } + + return PARSE_EUNFIN; +} + +static parse_rstatus_e +_read_blob(struct bstring *str, struct buf *buf) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + parse_rstatus_e status; + uint64_t len; + + bstring_init(str); + status = _read_uint(&len, buf, BULK_MAXLEN); + if (status != PARSE_OK) { + return status; + } + + if (buf_rsize(buf) >= len + CRLF_LEN) { + /* have enough bytes for the whole payload plus CRLF */ + str->len = len; + str->data = buf->rpos; + buf->rpos += str->len; + + if (line_end(buf)) { + buf->rpos += CRLF_LEN; + log_vverb("bulk string detected at %p, length %" PRIu32, buf->rpos, len); + + return PARSE_OK; + } + + if (*buf->rpos == CR) { + return PARSE_EUNFIN; + } + + log_warn("invalid character encountered, expecting CRLF: %c%c", + *buf->rpos, *(buf->rpos + 1)); + + return PARSE_EINVALID; + } + + return PARSE_EUNFIN; +} + +static parse_rstatus_e +_read_nil(struct buf *buf) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + /* Note: all this does is validate the CRLF */ + char* old_rpos = buf->rpos; + parse_rstatus_e status = _read_crlf(buf); + + if (status == PARSE_OK) { + log_vverb("nil detected at %p", old_rpos); + } + + return status; +} + +static parse_rstatus_e +_read_bool(bool *val, struct buf *buf) +{ + /* + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ + + if (buf_rsize(buf) < CRLF_LEN + 1) { + return PARSE_EUNFIN; + } + + switch (*buf->rpos) { + case 't': + *val = true; + break; + case 'f': + *val = false; + break; + default: + log_warn("invalid character encountered, expected t or f: %c", *buf->rpos); + return PARSE_EINVALID; + } + + buf->rpos++; + parse_rstatus_e status = _read_crlf(buf); + + if (status == PARSE_OK) { + log_vverb("parsed boolean, value %c", *val ? 't' : 'f'); + } + + return status; +} + +/* Parses a double according to the reds3 specification. + * + * According to the REDS3 spec, a double can be any of + * - A number (e.g. '10') + * - A number with a decimal point in the middle (e.g. '0.121' or + * '1241.1' but not '.5') + * - inf + * Any of these forms can be preceded by a minus sign ('-') for a negative + * number. + * + * The specification does not detail what should happen when the number + * would underflow or overflow, so this implementation makes the following + * choices: + * - If the value is too large to be represented as a double (overflow) + * then we error out with PARSE_EINVALID + * - If the value is too small to be represented as a double (underflow) + * then we round to 0 + * + * Beyond that, as we use strtod for parsing any specifics such as whether + * '-0.0' is stored as a signed 0 is up to the C stdlib implementation. + * + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ +static parse_rstatus_e +_read_double(double *val, struct buf *buf) +{ + parse_rstatus_e status = 0; + size_t len = 0; + char* start = buf->rpos; + + /* Check for all different literals that REDS3 supports. + * These are inf and -inf + */ + + status = _try_match("inf\r\n", buf); + if (status == PARSE_EUNFIN) { + return PARSE_EUNFIN; + } else if (status == PARSE_OK) { + *val = 1.0 / 0.0; + + log_vverb("parsed double, value inf"); + return PARSE_OK; + } + + status = _try_match("-inf\r\n", buf); + if (status == PARSE_EUNFIN) { + return PARSE_EUNFIN; + } else if (status == PARSE_OK) { + *val = -1.0 / 0.0; + + log_vverb("parsed double, value -inf"); + return PARSE_OK; + } + + for (; buf_rsize(buf) > 0; ++buf->rpos, ++len) { + if (*buf->rpos == CR) { + break; + } + + if (!isdigit(*buf->rpos) && !(*buf->rpos == '.') && !(*buf->rpos == '-')) { + log_warn("invalid character encountered: %c", *buf->rpos); + return PARSE_EINVALID; + } + } + + /* Need to ensure that there is a character present after the + * number otherwise strtod could read beyond the buffer. + */ + if (buf_rsize(buf) == 0) { + return PARSE_EUNFIN; + } else if (len == 0) { + log_warn("ill formatted token: empty double"); + return PARSE_EEMPTY; + } + + /* According to the spec a double of the form '.102' is invalid */ + if (*start == '.') { + log_warn("ill formatted token: double starting with '.'"); + return PARSE_EINVALID; + } + + char* end; + errno = 0; + *val = strtod(start, &end); + + if (errno == ERANGE) { + /* TODO(sean): Should large doubles be rounded to infinity? */ + if (*val == HUGE_VAL || *val == -HUGE_VAL) { + log_warn("ill formatted token: double was out of range"); + return PARSE_EINVALID; + } + if (*val == 0.0) { + /* + * This implementation assumes that doubles which are too + * small can safely be flushed to 0. + */ + } + } + + log_vverb("pased double, value was %f", *val); + return PARSE_OK; +} + +/* Parse a big integer according to the REDS3 specification. + * + * Note: buf->rpos is updated in this function, the caller is responsible + * for resetting the pointer if necessary. + */ +static parse_rstatus_e +_read_big_number(struct bstring *str, struct buf *buf) { + bstring_init(str); + + str->len = 0; + str->data = buf->rpos; + + for (; buf_rsize(buf) > 0; buf->rpos++) { + if (line_end(buf)) { + buf->rpos += CRLF_LEN; + log_vverb("big number detected at %p, length %" PRIu32, buf->rpos, str->len); + + return PARSE_OK; + } + + if (!isdigit(*buf->rpos)) { + log_warn("big number contained invalid character: %c", *buf->rpos); + return PARSE_EINVALID; + } + + if (++str->len > BIGNUM_MAXLEN) { + log_warn("big number max length (%d) exceeded", BIGNUM_MAXLEN); + return PARSE_EOVERSIZE; + } + } + + return PARSE_EUNFIN; +} + +/* Parse a single value. This does not handle any composite + * types such as arrays, sets, maps, push data, or + * associated data. + */ +parse_rstatus_e +parse_element(struct element *el, struct buf *buf) +{ + char *p; + parse_rstatus_e status; + + log_verb("detecting the next element %p in buf %p", el, buf); + + if (buf_rsize(buf) == 0) { + return PARSE_EUNFIN; + } + + p = buf->rpos++; + switch (*p) { + case '+': + /* simple string */ + el->type = ELEM_STR; + status = _read_str(&el->bstr, buf); + break; + + case '-': + /* simple error */ + el->type = ELEM_ERR; + status = _read_str(&el->bstr, buf); + break; + + case '$': + /* blob string */ + el->type = ELEM_BLOB_STR; + status = _read_blob(&el->bstr, buf); + break; + + case '!': + /* blob error */ + el->type = ELEM_BLOB_ERR; + status = _read_blob(&el->bstr, buf); + break; + + case '=': + /* verbatim string */ + el->type = ELEM_VERBATIM_STR; + status = _read_blob(&el->bstr, buf); + + /* Verbatim strings are like bulk strings with the extra + * requirement that they start with 3 bytes that identify + * the type followed by a colon. + */ + if (!(el->bstr.len > 4 && el->bstr.data[3] == ':')) { + log_warn("invalid verbatim string, did not start with type marker"); + status = PARSE_EINVALID; + } + + break; + + case ':': + /* number */ + el->type = ELEM_NUMBER; + status = _read_int(&el->num, buf, INT64_MIN, INT64_MAX); + break; + + case ',': + log_warn("found unsupported double in message"); + return PARSE_ENOTSUPPORTED; + /* double */ + el->type = ELEM_DOUBLE; + status = _read_double(&el->double_, buf); + break; + + case '(': + log_warn("found unsupported big number in message"); + return PARSE_ENOTSUPPORTED; + /* big number */ + el->type = ELEM_BIG_NUMBER; + status = _read_big_number(&el->bstr, buf); + break; + + case '_': + /* nil */ + el->type = ELEM_NIL; + status = _read_nil(buf); + break; + + case '#': + /* bool */ + el->type = ELEM_BOOL; + status = _read_bool(&el->boolean, buf); + break; + + default: + log_warn("'%c' is not a valid single-element type header", *p); + return PARSE_EINVALID; + } + + if (status != PARSE_OK) { /* rewind */ + buf->rpos = p; + } + + return status; +} + +static inline parse_rstatus_e +_token_generic_nelem(uint64_t *nelem, struct buf *buf) { + + char *pos = buf->rpos++; + parse_rstatus_e status = _read_uint(nelem, buf, ARRAY_MAXLEN); + if (status == PARSE_EUNFIN) { + buf->rpos = pos; + } + + return status; +} + +parse_rstatus_e +token_array_nelem(uint64_t *nelem, struct buf *buf) +{ + ASSERT(nelem != NULL && buf != NULL); + ASSERT(token_is_array(buf)); + + return _token_generic_nelem(nelem, buf); +} +parse_rstatus_e +token_map_nelem(uint64_t *nelem, struct buf *buf) +{ + ASSERT(nelem != NULL && buf != NULL); + ASSERT(token_is_map(buf)); + + parse_rstatus_e status = _token_generic_nelem(nelem, buf); + if (status == PARSE_OK) { + /* need to read both keys and values */ + *nelem *= 2; + } + + return status; +} +parse_rstatus_e +token_set_nelem(uint64_t *nelem, struct buf *buf) +{ + ASSERT(nelem != NULL && buf != NULL); + ASSERT(token_is_set(buf)); + + return _token_generic_nelem(nelem, buf); +} +parse_rstatus_e +token_attribute_nelem(uint64_t *nelem, struct buf *buf) +{ + ASSERT(nelem != NULL && buf != NULL); + ASSERT(token_is_attribute(buf)); + + return _token_generic_nelem(nelem, buf); +} +parse_rstatus_e +token_push_data_nelem(uint64_t *nelem, struct buf *buf) +{ + ASSERT(nelem != NULL && buf != NULL); + ASSERT(token_is_push_data(buf)); + + return _token_generic_nelem(nelem, buf); +} + +/*================================================================= + * Composite Type Identification Functions + *================================================================= + */ + +bool +token_is_array(struct buf *buf) +{ + ASSERT(buf != NULL); + + return buf_rsize(buf) > 0 && *buf->rpos == '*'; +} + +bool +token_is_map(struct buf *buf) +{ + ASSERT(buf != NULL); + + return buf_rsize(buf) > 0 && *buf->rpos == '%'; +} + +bool +token_is_set(struct buf *buf) +{ + ASSERT(buf != NULL); + + return buf_rsize(buf) > 0 && *buf->rpos == '~'; +} + +bool +token_is_attribute(struct buf *buf) +{ + ASSERT(buf != NULL); + + return buf_rsize(buf) > 0 && *buf->rpos == '|'; +} + +bool +token_is_push_data(struct buf *buf) +{ + ASSERT(buf != NULL); + + return buf_rsize(buf) > 0 && *buf->rpos == '>'; +} + +/*================================================================= + * REDS3 Protocol Composition Functions + *================================================================= + */ + +#define _write_lit(buf, lit) buf_write(buf, lit, sizeof(lit) - 1) + +static inline size_t +_write_uint(struct buf *buf, uint64_t val) +{ + size_t n = cc_print_uint64_unsafe(buf->wpos, val); + buf->wpos += n; + return n; +} + +static inline size_t +_write_int(struct buf *buf, int64_t val) +{ + size_t n = cc_print_int64_unsafe(buf->wpos, val); + buf->wpos += n; + return n; +} + +static inline size_t +_write_bstr(struct buf *buf, struct bstring *bstr) +{ + return buf_write(buf, bstr->data, bstr->len); +} + +static inline size_t +_write_bool(struct buf *buf, bool val) +{ + if (val) { + return _write_lit(buf, "t"); + } else { + return _write_lit(buf, "f"); + } +} + +static inline size_t +_write_blob(struct buf *buf, struct bstring *bstr) +{ + size_t n = 0; + n += _write_uint(buf, bstr->len); + n += _write_lit(buf, CRLF); + n += _write_bstr(buf, bstr); + return n; +} + +static inline int +_compose_header_generic(struct buf **buf, uint64_t nelem, char val) +{ + struct buf *b; + size_t n = 1 + CRLF_LEN + CC_UINT64_MAXLEN; + + if (_check_buf_size(buf, n) != COMPOSE_OK) { + return COMPOSE_ENOMEM; + } + + b = *buf; + *b->wpos++ = val; + + int len = 1; + len += _write_int(b, nelem); + len += _write_lit(b, CRLF); + return len; +} + +int +compose_array_header(struct buf **buf, uint64_t nelem) +{ + return _compose_header_generic(buf, nelem, '*'); +} +int +compose_map_header(struct buf **buf, uint64_t nelem) +{ + if (nelem % 2 != 0) { + log_warn("tried to create a map with an odd number of elements (%" PRIu64 " elements)", + nelem); + return COMPOSE_EINVALID; + } + + return _compose_header_generic(buf, nelem/2, '%'); +} +int +compose_set_header(struct buf **buf, uint64_t nelem) +{ + return _compose_header_generic(buf, nelem, '~'); +} +int +compose_attribute_header(struct buf **buf, uint64_t nelem) +{ + return _compose_header_generic(buf, nelem, '|'); +} +int +compose_push_data_header(struct buf **buf, uint64_t nelem) +{ + return _compose_header_generic(buf, nelem, '>'); +} + +int +compose_element(struct buf **buf, struct element *el) +{ + size_t n = 1 + CRLF_LEN; + struct buf *b; + + ASSERT(el->type > 0); + + /* estimate size (overestimages the size for anything that serializes an integer) */ + switch (el->type) { + case ELEM_STR: + case ELEM_ERR: + n += el->bstr.len; + break; + + case ELEM_NUMBER: + n += CC_UINT64_MAXLEN; + break; + + case ELEM_BLOB_STR: + case ELEM_BLOB_ERR: + case ELEM_VERBATIM_STR: + n += el->bstr.len + CC_UINT64_MAXLEN + CRLF_LEN; + + case ELEM_NIL: + break; + + case ELEM_BOOL: + n += 1; + break; + + case ELEM_DOUBLE: + case ELEM_BIG_NUMBER: + return COMPOSE_ENOTSUPPORTED; + + default: + return COMPOSE_EINVALID; + } + + if (_check_buf_size(buf, n) != COMPOSE_OK) { + return COMPOSE_ENOMEM; + } + + b = *buf; + log_verb("write element %p in buf %p", el, b); + + switch (el->type) { + case ELEM_STR: + n = _write_lit(b, "+"); + n += _write_bstr(b, &el->bstr); + break; + + case ELEM_ERR: + n = _write_lit(b, "-"); + n += _write_bstr(b, &el->bstr); + break; + + case ELEM_BLOB_STR: + n = _write_lit(b, "$"); + n += _write_blob(b, &el->bstr); + break; + + case ELEM_BLOB_ERR: + n = _write_lit(b, "!"); + n += _write_blob(b, &el->bstr); + break; + + case ELEM_VERBATIM_STR: + n = _write_lit(b, "="); + n += _write_blob(b, &el->bstr); + break; + + case ELEM_NUMBER: + n = _write_lit(b, ":"); + n += _write_int(b, el->num); + break; + + case ELEM_NIL: + n = _write_lit(b, "_"); + break; + + case ELEM_BOOL: + n = _write_lit(b, "#"); + n += _write_bool(b, el->boolean); + break; + + default: + NOT_REACHED(); + } + + n += _write_lit(b, CRLF); + /* If n > INT_MAX then the conversion here would cause UB */ + ASSERT(n < INT_MAX); + + return n; +} diff --git a/src/protocol/data/resp_tw/token.h b/src/protocol/data/resp_tw/token.h new file mode 100644 index 000000000..17bf6d6dc --- /dev/null +++ b/src/protocol/data/resp_tw/token.h @@ -0,0 +1,127 @@ +#pragma once + +/* + * This file handles serialization and deserialization for the + * RESP3 format. + */ + +/* + * Functions that deal with tokens in RESP3. + * + * In RESP3 the type of a value is decided by its leading character. + * - Blob strings start with '$' + * - Simple strings start with '+' + * - Simple errors start with '-' + * - Numbers start with ':' + * - Nil starts with '_' + * - Doubles start with ',' + * - Booleans start with '#' + * - Blob errors start with '!' + * - Verbatim strings start with '=' + * - Big numbers start with '(' + * - Arrays start with '*' + * - Maps start with '%' + * - Sets start with '~' + * - Attributes start with '|' + * - Push data starts with '>' + * + */ + +#include + +#include +#include +#include + +/* Note: most of these enums are identical to the ones in the resp folder */ + +typedef enum parse_rstatus { + PARSE_OK = 0, + PARSE_EUNFIN = -1, + PARSE_EEMPTY = -2, + PARSE_EOVERSIZE = -3, + PARSE_EINVALID = -4, + PARSE_EOTHER = -5, + PARSE_ENOTSUPPORTED = -6, +} parse_rstatus_e; + +typedef enum compose_rstatus { + COMPOSE_OK = 0, + COMPOSE_EUNFIN = -1, + COMPOSE_ENOMEM = -2, + COMPOSE_EINVALID = -3, + COMPOSE_EOTHER = -4, + COMPOSE_ENOTSUPPORTED = -5 +} compose_rstatus_e; + +/* array, map, set, attributes, and push data are not basic element types */ +typedef enum element_type { + ELEM_UNKNOWN = 0, + ELEM_STR = 1, + ELEM_ERR = 2, + ELEM_BLOB_STR = 3, + ELEM_BLOB_ERR = 4, + ELEM_NUMBER = 5, + ELEM_DOUBLE = 6, /* Note: currently unsupported */ + ELEM_BOOL = 7, + ELEM_VERBATIM_STR = 8, + ELEM_BIG_NUMBER = 9, /* Note: currently unsupported */ + ELEM_NIL = 10, + ELEM_ARRAY = 11, + ELEM_MAP = 12, + ELEM_SET = 13, + ELEM_ATTRIBUTES = 14, + ELEM_PUSH_DATA = 15, +} element_type_e; + +struct element { + element_type_e type; + union { + struct bstring bstr; + int64_t num; + double double_; + bool boolean; + }; +}; + +static inline bool +is_crlf(struct buf *buf) +{ + ASSERT(buf_rsize(buf) >= CRLF_LEN); + + return (*buf->rpos == CR && *(buf->rpos + 1) == LF); +} + +static inline bool +line_end(struct buf *buf) +{ + return (buf_rsize(buf) >= CRLF_LEN && is_crlf(buf)); +} + +bool token_is_array(struct buf *buf); +bool token_is_map(struct buf *buf); +bool token_is_set(struct buf *buf); +bool token_is_attribute(struct buf *buf); +bool token_is_push_data(struct buf *buf); + +parse_rstatus_e token_array_nelem(uint64_t *nelem, struct buf *buf); +parse_rstatus_e token_set_nelem(uint64_t *nelem, struct buf *buf); +parse_rstatus_e token_map_nelem(uint64_t *nelem, struct buf *buf); +parse_rstatus_e token_attribute_nelem(uint64_t *nelem, struct buf *buf); +parse_rstatus_e token_push_data_nelem(uint64_t *nelem, struct buf *buf); + +parse_rstatus_e parse_element(struct element *el, struct buf *buf); + +/* Write a type out to the buffer, returns the number of + * bytes written. Negative values are parse_rstatus_e error + * codes. + */ +int compose_array_header(struct buf **buf, uint64_t nelem); +int compose_map_header(struct buf **buf, uint64_t nelem); +int compose_set_header(struct buf **buf, uint64_t nelem); +int compose_attribute_header(struct buf **buf, uint64_t nelem); +int compose_push_data_header(struct buf **buf, uint64_t nelem); + +int compose_element(struct buf **buf, struct element *el); + + diff --git a/src/protocol/data/resp_tw_include.h b/src/protocol/data/resp_tw_include.h new file mode 100644 index 000000000..bccff5a5c --- /dev/null +++ b/src/protocol/data/resp_tw_include.h @@ -0,0 +1,6 @@ + +#include "resp_tw/parse.h" +#include "resp_tw/token.h" +#include "resp_tw/request.h" +#include "resp_tw/response.h" +#include "resp_tw/compose.h" diff --git a/test/protocol/data/CMakeLists.txt b/test/protocol/data/CMakeLists.txt index 227350e12..8abace392 100644 --- a/test/protocol/data/CMakeLists.txt +++ b/test/protocol/data/CMakeLists.txt @@ -1,2 +1,3 @@ add_subdirectory(memcache) add_subdirectory(resp) +add_subdirectory(resp_tw) diff --git a/test/protocol/data/resp_tw/CMakeLists.txt b/test/protocol/data/resp_tw/CMakeLists.txt new file mode 100644 index 000000000..8a77186c9 --- /dev/null +++ b/test/protocol/data/resp_tw/CMakeLists.txt @@ -0,0 +1,10 @@ +set(suite resp_tw) +set(test_name check_${suite}) + +set(source check_${suite}.c) + +add_executable(${test_name} ${source}) +target_link_libraries(${test_name} protocol_${suite}) +target_link_libraries(${test_name} ccommon-static ${CHECK_LIBRARIES} pthread m) + +add_test(${test_name} ${test_name}) \ No newline at end of file diff --git a/test/protocol/data/resp_tw/check_resp_tw.c b/test/protocol/data/resp_tw/check_resp_tw.c new file mode 100644 index 000000000..4590c79e8 --- /dev/null +++ b/test/protocol/data/resp_tw/check_resp_tw.c @@ -0,0 +1,756 @@ +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include + +/* define for each suite, local scope due to macro visibility rule */ +#define SUITE_NAME "resp" +#define DEBUG_LOG SUITE_NAME ".log" + +struct request *req; +struct response *rsp; +struct buf *buf; + +/* A simple wrapper around START_TEST that emits a log message + * when the test starts. Meant to ease debugging. + */ +#define LOGGED_TEST(test_name) \ + START_TEST(test_name) \ + log_info("starting test " #test_name); + +#define ck_assert_str_eq_len(X, Y, len) \ + do { \ + const char *a = X; \ + const char *b = Y; \ + int l = len; \ + ck_assert_msg( \ + cc_bcmp(a, b, l) == 0, \ + "Assertion %s == %s failed. %s == \"%.*s\", %s == \"%.*s\"", \ + #X, #Y, #X, l, a, #Y, l, b \ + ); \ + } while (0) + +/* utilities */ +static void +test_setup(void) +{ + req = request_create(); + rsp = response_create(); + buf = buf_create(); +} + +static void +test_reset(void) +{ + request_reset(req); + response_reset(rsp); + buf_reset(buf); +} + +static void +test_teardown(void) +{ + buf_destroy(&buf); + response_destroy(&rsp); + request_destroy(&req); +} + +#define TEST_SERIALIZE + +/************** + * test cases * + **************/ + +LOGGED_TEST(test_simple_string) +{ +#define STR "foobar" +#define SERIALIZED "+" STR "\r\n" + + struct element el_c, el_p; + int ret; + int len = sizeof(SERIALIZED) - 1; + char *pos; + + test_reset(); + + /* compose */ + el_c.type = ELEM_STR; + el_c.bstr = str2bstr(STR); + ret = compose_element(&buf, &el_c); + ck_assert_msg(ret == len, "bytes expected: %d, returned: %d", len, ret); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, ret), 0); + + /* parse */ + pos = buf->rpos + 1; + ret = parse_element(&el_p, buf); + ck_assert_int_eq(ret, PARSE_OK); + ck_assert(buf->rpos == buf->wpos); + ck_assert(el_p.type == ELEM_STR); + ck_assert(el_p.bstr.len == sizeof(STR) - 1); + ck_assert(el_p.bstr.data == pos); + +#undef SERIALIZED +#undef STR +} +END_TEST + +LOGGED_TEST(test_error) +{ +#define ERR "something is wrong" +#define SERIALIZED "-" ERR "\r\n" + + struct element el_c, el_p; + int ret; + int len = sizeof(SERIALIZED) - 1; + char *pos; + + test_reset(); + + /* compose */ + el_c.type = ELEM_ERR; + el_c.bstr = str2bstr(ERR); + ret = compose_element(&buf, &el_c); + ck_assert_msg(ret == len, "bytes expected: %d, returned: %d", len, ret); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, ret), 0); + + /* parse */ + pos = buf->rpos + 1; + ret = parse_element(&el_p, buf); + ck_assert_int_eq(ret, PARSE_OK); + ck_assert(buf->rpos == buf->wpos); + ck_assert(el_p.type == ELEM_ERR); + ck_assert(el_p.bstr.len == sizeof(ERR) - 1); + ck_assert(el_p.bstr.data == pos); + +#undef SERIALIZED +#undef ERR +} +END_TEST + +LOGGED_TEST(test_integer) +{ +#define OVERSIZE ":19223372036854775807\r\n" +#define INVALID1 ":123lOl456\r\n" +#define INVALID2 ":\r\n" + + struct element el_c, el_p; + int ret; + + struct int_pair { + char *serialized; + uint64_t num; + } pairs[3] = { + {":-1\r\n", -1}, + {":9223372036854775807\r\n", 9223372036854775807}, + {":128\r\n", 128} + }; + + + test_reset(); + for (int i = 0; i < 3; i++) { + size_t len = strlen(pairs[i].serialized); + + buf_reset(buf); + el_c.type = ELEM_NUMBER; + el_c.num = pairs[i].num; + ret = compose_element(&buf, &el_c); + ck_assert(ret == len); + ck_assert_int_eq(cc_bcmp(buf->rpos, pairs[i].serialized, len), 0); + + el_p.type = ELEM_UNKNOWN; + ret = parse_element(&el_p, buf); + ck_assert_int_eq(ret, PARSE_OK); + ck_assert(buf->rpos == buf->wpos); + ck_assert(el_p.type == ELEM_NUMBER); + ck_assert(el_p.num == pairs[i].num); + } + + buf_reset(buf); + buf_write(buf, OVERSIZE, sizeof(OVERSIZE) - 1); + ret = parse_element(&el_p, buf); + ck_assert_int_eq(ret, PARSE_EOVERSIZE); + + buf_reset(buf); + buf_write(buf, INVALID1, sizeof(INVALID1) - 1); + ret = parse_element(&el_p, buf); + ck_assert_int_eq(ret, PARSE_EINVALID); + + buf_reset(buf); + buf_write(buf, INVALID2, sizeof(INVALID2) - 1); + ret = parse_element(&el_p, buf); + ck_assert_int_eq(ret, PARSE_EINVALID); + +#undef INVALID2 +#undef INVALID1 +#undef OVERSIZE +} +END_TEST + +LOGGED_TEST(test_bulk_string) +{ +#define BULK "foo bar\r\n" +#define SERIALIZED "$9\r\n" BULK "\r\n" +#define EMPTY "$0\r\n\r\n" + + struct element el_c, el_p; + int ret; + int len = sizeof(SERIALIZED) - 1; + + test_reset(); + + /* compose */ + el_c.type = ELEM_BLOB_STR; + el_c.bstr = str2bstr(BULK); + ret = compose_element(&buf, &el_c); + ck_assert_msg(ret == len, "bytes expected: %d, returned: %d, out: %s", len, ret, buf->begin); + ck_assert_msg( + cc_bcmp(buf->rpos, SERIALIZED, ret) == 0, + "string comparison failed: '%s' != '%s'", buf->rpos, + SERIALIZED + ); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, ret), 0); + + /* parse */ + ck_assert_int_eq(parse_element(&el_p, buf), PARSE_OK); + ck_assert(buf->rpos == buf->wpos); + ck_assert(el_p.type == ELEM_BLOB_STR); + ck_assert(el_p.bstr.len == sizeof(BULK) - 1); + ck_assert(el_p.bstr.data + el_p.bstr.len == buf->rpos - CRLF_LEN); + ck_assert(buf->rpos == buf->wpos); + + /* empty string */ + buf_reset(buf); + len = sizeof(EMPTY) - 1; + el_c.bstr = null_bstring; + ret = compose_element(&buf, &el_c); + ck_assert_msg(ret == len, "bytes expected: %d, returned: %d, out: %s", len, ret, buf->begin); + ck_assert_int_eq(cc_bcmp(buf->rpos, EMPTY, ret), 0); + ck_assert_int_eq(parse_element(&el_p, buf), PARSE_OK); + ck_assert(el_p.bstr.len == 0); + + +#undef EMPTY +#undef SERIALIZED +#undef BULK +} +END_TEST + +LOGGED_TEST(test_array) +{ +#define SERIALIZED "*2\r\n+foo\r\n$4\r\nbarr\r\n" +#define NELEM 2 + + size_t len = sizeof(SERIALIZED) - 1; + uint64_t nelem; + + test_reset(); + + buf_write(buf, SERIALIZED, len); + ck_assert(token_is_array(buf)); + ck_assert_int_eq(token_array_nelem(&nelem, buf), PARSE_OK); + ck_assert_int_eq(nelem, NELEM); + +#undef NELEM +#undef SERIALIZED +} +END_TEST + +LOGGED_TEST(test_nil_blob_str_invalid) +{ +#define NIL_BULK "$-1\r\n" + + size_t len = sizeof(NIL_BULK) - 1; + struct element el; + + test_reset(); + + buf_write(buf, NIL_BULK, len); + el.type = ELEM_UNKNOWN; + ck_assert_int_eq(parse_element(&el, buf), PARSE_EINVALID); + +#undef NIL_BULK +} +END_TEST + +LOGGED_TEST(test_unfin_token) +{ + char *token[13] = { + "+hello ", + "-err", + "-err\r", + ":5", + ":5\r", + "$5", + "$5\r", + "$5\r\n", + "$5\r\nabc", + "$5\r\nabcde\r", + "*5", + "*5\r", + }; + char *pos; + size_t len; + + for (int i = 0; i < 10; i++) { + struct element el; + + len = strlen(token[i]); + buf_reset(buf); + buf_write(buf, token[i], len); + pos = buf->rpos; + ck_assert_int_eq(parse_element(&el, buf), PARSE_EUNFIN); + ck_assert(buf->rpos == pos); + } + + for (int i = 10; i < 12; i++) { + uint64_t nelem; + + len = strlen(token[i]); + buf_reset(buf); + buf_write(buf, token[i], len); + pos = buf->rpos; + ck_assert_int_eq(token_array_nelem(&nelem, buf), PARSE_EUNFIN); + ck_assert(buf->rpos == pos); + } +} +END_TEST + +LOGGED_TEST(test_double_unsupported) +{ +#define DOUBLE ",3.14152695\r\n" + size_t len = sizeof(DOUBLE) - 1; + + test_reset(); + buf_write(buf, DOUBLE, len); + + struct element el; + ck_assert_int_eq(parse_element(&el, buf), PARSE_ENOTSUPPORTED); +#undef DOUBLE +} +END_TEST + +/* + * request + */ + +LOGGED_TEST(test_quit) +{ +#define QUIT "quit" +#define SERIALIZED "*1\r\n$4\r\n" QUIT "\r\n" +#define INVALID "*2\r\n$4\r\n" QUIT "\r\n$3\r\nnow\r\n" + int ret; + struct element *el; + + test_reset(); + + req->type = REQ_QUIT; + el = array_push(req->token); + el->type = ELEM_BLOB_STR; + el->bstr = (struct bstring){sizeof(QUIT) - 1, QUIT}; + ret = compose_req(&buf, req); + ck_assert_int_eq(ret, sizeof(SERIALIZED) - 1); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, ret), 0); + + el->type = ELEM_UNKNOWN; /* this effectively resets *el */ + request_reset(req); + ck_assert_int_eq(parse_req(req, buf), PARSE_OK); + ck_assert_int_eq(req->type, REQ_QUIT); + ck_assert_int_eq(req->token->nelem, 1); + el = array_first(req->token); + ck_assert_int_eq(el->type, ELEM_BLOB_STR); + ck_assert_int_eq(cc_bcmp(el->bstr.data, QUIT, sizeof(QUIT) - 1), 0); + + /* invalid number of arguments */ + test_reset(); + buf_write(buf, INVALID, sizeof(INVALID) - 1); + ck_assert_int_eq(parse_req(req, buf), PARSE_EINVALID); +#undef INVALID +#undef SERIALIZED +#undef QUIT +} +END_TEST + + +LOGGED_TEST(test_ping) +{ +#define PING "ping" +#define VAL "hello" +#define S_PING "*1\r\n$4\r\n" PING "\r\n" +#define S_ECHO "*2\r\n$4\r\n" PING "\r\n$5\r\nhello\r\n" + int ret; + struct element *el; + + test_reset(); + + /* simple ping */ + buf_write(buf, S_PING, sizeof(S_PING) - 1); + ck_assert_int_eq(parse_req(req, buf), PARSE_OK); + ck_assert_int_eq(req->type, REQ_PING); + + /* ping as echo */ + test_reset(); + + req->type = REQ_PING; + el = array_push(req->token); + el->type = ELEM_BLOB_STR; + el->bstr = (struct bstring){sizeof(PING) - 1, PING}; + el = array_push(req->token); + el->type = ELEM_BLOB_STR; + el->bstr = (struct bstring){sizeof(VAL) - 1, VAL}; + ret = compose_req(&buf, req); + ck_assert_int_eq(ret, sizeof(S_ECHO) - 1); + ck_assert_int_eq(cc_bcmp(buf->rpos, S_ECHO, ret), 0); + + el->type = ELEM_UNKNOWN; /* resets *el */ + request_reset(req); + ck_assert_int_eq(parse_req(req, buf), PARSE_OK); + ck_assert_int_eq(req->type, REQ_PING); + ck_assert_int_eq(req->token->nelem, 2); + el = array_first(req->token); + ck_assert_int_eq(el->type, ELEM_BLOB_STR); + ck_assert_int_eq(cc_bcmp(el->bstr.data, PING, sizeof(PING) - 1), 0); + el = array_get(req->token, 1); + ck_assert_int_eq(el->type, ELEM_BLOB_STR); + ck_assert_int_eq(cc_bcmp(el->bstr.data, VAL, sizeof(VAL) - 1), 0); +#undef S_ECHO +#undef ECHO +#undef S_PING +#undef QUIT +} +END_TEST + +LOGGED_TEST(test_unfin_req) +{ + char *token[4] = { + "*2\r\n", + "*2\r\n$3\r\n", + "*2\r\n$3\r\nfoo\r\n", + "*2\r\n$3\r\nfoo\r\n$3\r\n", + }; + + for (int i = 0; i < 4; i++) { + char *pos; + size_t len; + + len = strlen(token[i]); + buf_reset(buf); + buf_write(buf, token[i], len); + pos = buf->rpos; + ck_assert_int_eq(parse_req(req, buf), PARSE_EUNFIN); + ck_assert(buf->rpos == pos); + } +} +END_TEST + +/* + * response + */ +LOGGED_TEST(test_ok) +{ +#define OK "OK" +#define SERIALIZED "+" OK "\r\n" + int ret; + struct element *el; + + test_reset(); + + rsp->type = ELEM_STR; + el = array_push(rsp->token); + el->type = ELEM_STR; + el->bstr = (struct bstring){sizeof(OK) - 1, OK}; + ret = compose_rsp(&buf, rsp); + ck_assert_int_eq(ret, sizeof(SERIALIZED) - 1); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, ret), 0); + + el->type = ELEM_UNKNOWN; /* resets *el */ + response_reset(rsp); + ck_assert_int_eq(parse_rsp(rsp, buf), PARSE_OK); + ck_assert_int_eq(rsp->type, ELEM_STR); + ck_assert_int_eq(rsp->token->nelem, 1); + el = array_first(rsp->token); + ck_assert_int_eq(el->type, ELEM_STR); + ck_assert_int_eq(cc_bcmp(el->bstr.data, OK, sizeof(OK) - 1), 0); +#undef SERIALIZED +#undef OK +} +END_TEST + +LOGGED_TEST(test_array_reply) +{ +#define SERIALIZED "*5\r\n:-10\r\n_\r\n-ERR invalid arg\r\n+foo\r\n$5\r\nHELLO\r\n" + size_t len = sizeof(SERIALIZED) - 1; + struct element *el; + + test_reset(); + + buf_write(buf, SERIALIZED, len); + ck_assert_int_eq(parse_rsp(rsp, buf), PARSE_OK); + ck_assert_int_eq(rsp->type, ELEM_ARRAY); + ck_assert_int_eq(rsp->token->nelem, 5); + el = array_first(rsp->token); + ck_assert_int_eq(el->type, ELEM_NUMBER); + el = array_get(rsp->token, 1); + ck_assert_int_eq(el->type, ELEM_NIL); + el = array_get(rsp->token, 2); + ck_assert_int_eq(el->type, ELEM_ERR); + el = array_get(rsp->token, 3); + ck_assert_int_eq(el->type, ELEM_STR); + el = array_get(rsp->token, 4); + ck_assert_int_eq(el->type, ELEM_BLOB_STR); + ck_assert_int_eq(el->bstr.len, 5); + ck_assert_int_eq(cc_bcmp(el->bstr.data, "HELLO", 5), 0); + ck_assert_int_eq(buf_rsize(buf), 0); + ck_assert_int_eq(rsp->attrs->nelem, 0); + + ck_assert_int_eq(compose_rsp(&buf, rsp), len); + ck_assert_int_eq(buf_rsize(buf), len); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, len), 0); +#undef SERIALIZED +} +END_TEST + +LOGGED_TEST(test_reply_with_attributes) +{ +#define SERIALIZED "|1\r\n+sTTL\r\n:15\r\n_\r\n" + size_t len = sizeof(SERIALIZED) - 1; + struct attribute_entry *entry; + + test_reset(); + + buf_write(buf, SERIALIZED, len); + ck_assert_int_eq(parse_rsp(rsp, buf), PARSE_OK); + ck_assert_int_eq(rsp->type, ELEM_NIL); + ck_assert_int_eq(rsp->attrs->nelem, 1); + entry = array_first(rsp->attrs); + ck_assert_int_eq(entry->key.type, ELEM_STR); + ck_assert_str_eq_len(entry->key.bstr.data, "sTTL", 4); + ck_assert_int_eq(entry->val.type, ELEM_NUMBER); + ck_assert_int_eq(entry->val.num, 15); + + ck_assert_int_eq(compose_rsp(&buf, rsp), len); + ck_assert_int_eq(buf_rsize(buf), len); + ck_assert_str_eq_len(buf->rpos, SERIALIZED, len); +#undef SERIALIZED +} +END_TEST + +LOGGED_TEST(test_map_reply) +{ +#define TEST "test" +#define OTHER "other" +#define SERIALIZED "%2\r\n+" TEST "\r\n:3\r\n+" OTHER "\r\n:4\r\n" + size_t test_len = sizeof(TEST) - 1; + size_t other_len = sizeof(OTHER) - 1; + size_t len = sizeof(SERIALIZED) - 1; + struct element *el; + + test_reset(); + + buf_write(buf, SERIALIZED, len); + ck_assert_int_eq(parse_rsp(rsp, buf), PARSE_OK); + ck_assert_int_eq(rsp->attrs->nelem, 0); + ck_assert_int_eq(rsp->token->nelem, 4); + el = array_get(rsp->token, 0); + ck_assert_int_eq(el->type, ELEM_STR); + ck_assert_str_eq_len(el->bstr.data, TEST, test_len); + el = array_get(rsp->token, 1); + ck_assert_int_eq(el->type, ELEM_NUMBER); + ck_assert_int_eq(el->num, 3); + el = array_get(rsp->token, 2); + ck_assert_int_eq(el->type, ELEM_STR); + ck_assert_str_eq_len(el->bstr.data, OTHER, other_len); + el = array_get(rsp->token, 3); + ck_assert_int_eq(el->type, ELEM_NUMBER); + ck_assert_int_eq(el->num, 4); + + ck_assert_int_eq(compose_rsp(&buf, rsp), len); + ck_assert_int_eq(buf_rsize(buf), len); + ck_assert_str_eq_len(buf->rpos, SERIALIZED, len); +#undef TEST +#undef OTHER +#undef SERIALIZED +} +END_TEST + +/* + * edge cases + */ +LOGGED_TEST(test_empty_buf) +{ + struct element el; + test_reset(); + + ck_assert(!token_is_array(buf)); + ck_assert_int_eq(parse_element(&el, buf), PARSE_EUNFIN); + ck_assert_int_eq(parse_rsp(rsp, buf), PARSE_EUNFIN); + ck_assert_int_eq(parse_req(req, buf), PARSE_EUNFIN); +} +END_TEST + +LOGGED_TEST(test_large_bulk_string) +{ + /* + * Test a bulk string with a size just above the maximum + * allowed size (512 MB - 1). If bulk string handling is + * implemented correctly then this should return + * PARSE_EUNFIN. + */ + +#define SERIALIZED "$536870911\r\n\r\n" + struct element el; + + test_reset(); + + buf_write(buf, SERIALIZED, sizeof(SERIALIZED) - 1); + + ck_assert_int_eq(parse_element(&el, buf), PARSE_EUNFIN); +#undef SERIALIZED +} +END_TEST + +/* + * request/response pool + */ + +LOGGED_TEST(test_req_pool_basic) +{ +#define POOL_SIZE 10 + int i; + struct request *reqs[POOL_SIZE]; + request_options_st options = { + .request_ntoken = {.type = OPTION_TYPE_UINT, .val.vuint = REQ_NTOKEN}, + .request_poolsize = {.type = OPTION_TYPE_UINT, .val.vuint = POOL_SIZE}}; + + request_setup(&options, NULL); + + for (i = 0; i < POOL_SIZE; i++) { + reqs[i] = request_borrow(); + ck_assert_msg(reqs[i] != NULL, "expected to borrow a request"); + } + ck_assert_msg(request_borrow() == NULL, "expected request pool to be depleted"); + for (i = 0; i < POOL_SIZE; i++) { + request_return(&reqs[i]); + ck_assert_msg(reqs[i] == NULL, "expected request to be nulled after return"); + } + + request_teardown(); +#undef POOL_SIZE +} +END_TEST + +LOGGED_TEST(test_rsp_pool_basic) +{ +#define POOL_SIZE 10 + int i; + struct response *rsps[POOL_SIZE]; + response_options_st options = { + .response_ntoken = {.type = OPTION_TYPE_UINT, .val.vuint = RSP_NTOKEN}, + .response_poolsize = {.type = OPTION_TYPE_UINT, .val.vuint = POOL_SIZE}}; + + response_setup(&options, NULL); + + for (i = 0; i < POOL_SIZE; i++) { + rsps[i] = response_borrow(); + ck_assert_msg(rsps[i] != NULL, "expected to borrow a response"); + } + ck_assert_msg(response_borrow() == NULL, "expected response pool to be depleted"); + for (i = 0; i < POOL_SIZE; i++) { + response_return(&rsps[i]); + ck_assert_msg(rsps[i] == NULL, "expected response to be nulled after return"); + } + + response_teardown(); +#undef POOL_SIZE +} +END_TEST + + +/* + * Test Suite + */ +static Suite * +resp_suite(void) +{ + Suite *s = suite_create(SUITE_NAME); + + /* token */ + TCase *tc_token = tcase_create("token"); + suite_add_tcase(s, tc_token); + + tcase_add_test(tc_token, test_simple_string); + tcase_add_test(tc_token, test_error); + tcase_add_test(tc_token, test_integer); + tcase_add_test(tc_token, test_bulk_string); + tcase_add_test(tc_token, test_array); + tcase_add_test(tc_token, test_nil_blob_str_invalid); + tcase_add_test(tc_token, test_unfin_token); + tcase_add_test(tc_token, test_double_unsupported); + + /* basic requests */ + TCase *tc_request = tcase_create("request"); + suite_add_tcase(s, tc_request); + + tcase_add_test(tc_request, test_quit); + tcase_add_test(tc_request, test_ping); + tcase_add_test(tc_request, test_unfin_req); + + /* basic responses */ + TCase *tc_response = tcase_create("response"); + suite_add_tcase(s, tc_response); + + tcase_add_test(tc_response, test_ok); + tcase_add_test(tc_response, test_map_reply); + tcase_add_test(tc_response, test_array_reply); + tcase_add_test(tc_response, test_reply_with_attributes); + + /* edge cases */ + TCase *tc_edge = tcase_create("edge cases"); + suite_add_tcase(s, tc_edge); + tcase_add_test(tc_edge, test_empty_buf); + tcase_add_test(tc_edge, test_large_bulk_string); + tcase_add_test(tc_edge, test_large_bulk_string); + + /* req/rsp objects, pooling */ + TCase *tc_pool = tcase_create("request/response pool"); + suite_add_tcase(s, tc_pool); + + tcase_add_test(tc_pool, test_req_pool_basic); + tcase_add_test(tc_pool, test_rsp_pool_basic); + + return s; +} + +int +main(void) +{ + int nfail; + + debug_options_st opts = { + DEBUG_OPTION(OPTION_INIT) + }; + opts.debug_log_level.val.vuint = LOG_INFO; + debug_setup(&opts); + test_setup(); + + Suite *suite = resp_suite(); + SRunner *srunner = srunner_create(suite); + srunner_set_log(srunner, DEBUG_LOG); + srunner_run_all(srunner, CK_ENV); + + nfail = srunner_ntests_failed(srunner); + + srunner_free(srunner); + + test_teardown(); + debug_teardown(); + + return (nfail == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +}