-
Notifications
You must be signed in to change notification settings - Fork 151
Expand file tree
/
Copy pathrequest.rs
More file actions
123 lines (112 loc) · 4.25 KB
/
request.rs
File metadata and controls
123 lines (112 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use std::any::Any;
use std::time::Duration;
use async_trait::async_trait;
use tonic::transport::Channel;
use tonic::IntoRequest;
use crate::proto::kvrpcpb;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::store::RegionWithLeader;
use crate::Error;
use crate::Result;
#[async_trait]
pub trait Request: Any + Sync + Send + 'static {
async fn dispatch(
&self,
client: &TikvClient<Channel>,
timeout: Duration,
) -> Result<Box<dyn Any>>;
fn label(&self) -> &'static str;
fn as_any(&self) -> &dyn Any;
fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()>;
fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion);
}
macro_rules! impl_request {
($name: ident, $fun: ident, $label: literal) => {
#[async_trait]
impl Request for kvrpcpb::$name {
async fn dispatch(
&self,
client: &TikvClient<Channel>,
timeout: Duration,
) -> Result<Box<dyn Any>> {
let mut req = self.clone().into_request();
req.set_timeout(timeout);
client
.clone()
.$fun(req)
.await
.map(|r| Box::new(r.into_inner()) as Box<dyn Any>)
.map_err(Error::from)
}
fn label(&self) -> &'static str {
$label
}
fn as_any(&self) -> &dyn Any {
self
}
fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> {
let ctx = self.context.get_or_insert(kvrpcpb::Context::default());
let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderNotFound {
region: leader.ver_id(),
})?;
ctx.region_id = leader.region.id;
ctx.region_epoch = leader.region.region_epoch.clone();
ctx.peer = Some(leader_peer.clone());
Ok(())
}
fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
let ctx = self.context.get_or_insert(kvrpcpb::Context::default());
ctx.api_version = api_version.into();
}
}
};
}
impl_request!(RawGetRequest, raw_get, "raw_get");
impl_request!(RawBatchGetRequest, raw_batch_get, "raw_batch_get");
impl_request!(RawGetKeyTtlRequest, raw_get_key_ttl, "raw_get_key_ttl");
impl_request!(RawPutRequest, raw_put, "raw_put");
impl_request!(RawBatchPutRequest, raw_batch_put, "raw_batch_put");
impl_request!(RawDeleteRequest, raw_delete, "raw_delete");
impl_request!(RawBatchDeleteRequest, raw_batch_delete, "raw_batch_delete");
impl_request!(RawScanRequest, raw_scan, "raw_scan");
impl_request!(RawBatchScanRequest, raw_batch_scan, "raw_batch_scan");
impl_request!(RawDeleteRangeRequest, raw_delete_range, "raw_delete_range");
impl_request!(RawCasRequest, raw_compare_and_swap, "raw_compare_and_swap");
impl_request!(RawCoprocessorRequest, raw_coprocessor, "raw_coprocessor");
impl_request!(GetRequest, kv_get, "kv_get");
impl_request!(ScanRequest, kv_scan, "kv_scan");
impl_request!(PrewriteRequest, kv_prewrite, "kv_prewrite");
impl_request!(CommitRequest, kv_commit, "kv_commit");
impl_request!(BatchGetRequest, kv_batch_get, "kv_batch_get");
impl_request!(BatchRollbackRequest, kv_batch_rollback, "kv_batch_rollback");
impl_request!(
PessimisticRollbackRequest,
kv_pessimistic_rollback,
"kv_pessimistic_rollback"
);
impl_request!(ResolveLockRequest, kv_resolve_lock, "kv_resolve_lock");
impl_request!(ScanLockRequest, kv_scan_lock, "kv_scan_lock");
impl_request!(
PessimisticLockRequest,
kv_pessimistic_lock,
"kv_pessimistic_lock"
);
impl_request!(TxnHeartBeatRequest, kv_txn_heart_beat, "kv_txn_heart_beat");
impl_request!(
CheckTxnStatusRequest,
kv_check_txn_status,
"kv_check_txn_status"
);
impl_request!(
CheckSecondaryLocksRequest,
kv_check_secondary_locks,
"kv_check_secondary_locks_request"
);
impl_request!(GcRequest, kv_gc, "kv_gc");
impl_request!(DeleteRangeRequest, kv_delete_range, "kv_delete_range");
impl_request!(
UnsafeDestroyRangeRequest,
unsafe_destroy_range,
"unsafe_destroy_range"
);