-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfigs.py
More file actions
213 lines (184 loc) Β· 6.17 KB
/
configs.py
File metadata and controls
213 lines (184 loc) Β· 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
"""Workflow Pipelines API."""
from json import dumps
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode
from requests.models import Response
from tenacity import retry
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random
from workflow.http.client import Client
class Configs(Client):
"""HTTP Client for interacting with the Configs endpoints on pipelines backend.
Args:
Client (workflow.http.client): The base class for interacting with the backend.
Returns:
Configs: A client for interacting with workflow-pipelines.
"""
# @retry(
# reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
# )
def deploy(self, data: Dict[str, Any]):
"""Deploys a Config from payload data.
Parameters
----------
data : Dict[str, Any]
YAML data.
Returns
-------
List[str]
ID of Config object generated.
"""
with self.session as session:
url = f"{self.baseurl}/configs"
response: Response = session.post(url, json=data)
response.raise_for_status()
return response.json()
@retry(
reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
)
def migrate(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Migrates V1 objects to V2.
Parameters
----------
data : Dict[str, Any]
Object payload.
Returns
-------
Dict[str, Any]
Backend JSON response.
"""
with self.session as session:
url = f"{self.baseurl}/configs/migrate"
response: Response = session.post(url, json=data)
response.raise_for_status()
return response.json()
def count(self) -> Dict[str, Any]:
"""Count all documents in a collection.
Returns
-------
Dict[str, Any]
Dictionary with count.
"""
with self.session as session:
response: Response = session.get(
url=f"{self.baseurl}/configs/count?database=configs"
)
response.raise_for_status()
return response.json()
def get_configs(
self,
name: Optional[str] = None,
query: Optional[str] = "{}",
projection: Optional[str] = "{}",
) -> List[Dict[str, Any]]:
"""View the current configurations on pipelines backend.
Parameters
----------
name : str
Config name, by default None
query : str, optional
Query payload.
projection : str, optional
Query projection.
Returns
-------
List[Dict[str, Any]]
List of Config payloads.
"""
with self.session as session:
# ? When using urlencode, internal dict object get single-quoted
# ? This can trigger error on workflow-pipelines backend
params = {"projection": projection, "query": query}
if name:
params.update({"name": name})
url = f"{self.baseurl}/configs?{urlencode(params)}"
response: Response = session.get(url=url)
response.raise_for_status()
return response.json()
@retry(
reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
)
def remove(self, config: str, id: str) -> Response:
"""Removes a cancelled pipeline configuration.
Parameters
----------
config : str
Config name.
id : str
Config ID.
Returns
-------
List[Dict[str, Any]]
Response payload.
"""
with self.session as session:
query = {"id": id}
params = {"query": dumps(query), "name": config}
url = f"{self.baseurl}/configs?{urlencode(params)}"
response: Response = session.delete(url=url)
response.raise_for_status()
return response
@retry(
reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
)
def stop(self, config_name: str, id: str) -> List[Dict[str, Any]]:
"""Stops the manager for a PipelineConfig.
Parameters
----------
config_name : str
Config name.
id : str
Config ID.
Returns
-------
List[Dict[str, Any]]
List of stopped PipelineConfig objects.
"""
with self.session as session:
query = {"id": id}
params = {"query": dumps(query), "name": config_name}
url = f"{self.baseurl}/configs/cancel?{urlencode(params)}"
response: Response = session.put(url)
response.raise_for_status()
if response.status_code == 304:
return []
return response.json()
@retry(
reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
)
def info(self) -> Dict[str, Any]:
"""Get the version of the pipelines backend.
Returns
-------
Dict[str, Any]
Pipelines backend info.
"""
client_info = self.model_dump()
with self.session as session:
response: Response = session.get(url=f"{self.baseurl}/version")
response.raise_for_status()
server_info = response.json()
return {"client": client_info, "server": server_info}
@retry(
reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
)
def _retry(self, name: str, id: str) -> Dict[str, Any]:
"""Makes POST request to /retry endpoint.
Parameters
----------
name : str
Config name.
id : str
Config ID.
Returns
-------
Dict[str, Any]
Response payload.
"""
with self.session as session:
body = {"id": id, "name": name}
response: Response = session.post(
url=f"{self.baseurl}/configs/retry", json=body
)
response.raise_for_status()
return response.json()