-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Expand file tree
/
Copy pathipc_async.pxi
More file actions
156 lines (121 loc) · 4.84 KB
/
ipc_async.pxi
File metadata and controls
156 lines (121 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
class _AsyncioCall:
"""State for an async operation using asyncio."""
def __init__(self):
import asyncio
self._future = asyncio.get_running_loop().create_future()
def as_awaitable(self):
return self._future
def wakeup(self, result_or_exception):
loop = self._future.get_loop()
if isinstance(result_or_exception, BaseException):
loop.call_soon_threadsafe(
self._future.set_exception, result_or_exception)
else:
loop.call_soon_threadsafe(
self._future.set_result, result_or_exception)
cdef object _wrap_record_batch_or_none(CRecordBatchWithMetadata batch_with_md):
"""Wrap a CRecordBatchWithMetadata as a RecordBatch, or return None at end-of-stream."""
if batch_with_md.batch.get() == NULL:
return None
return pyarrow_wrap_batch(batch_with_md.batch)
cdef object _wrap_async_generator(CAsyncRecordBatchGenerator gen):
"""Wrap a CAsyncRecordBatchGenerator into an AsyncRecordBatchReader."""
cdef AsyncRecordBatchReader reader = AsyncRecordBatchReader.__new__(
AsyncRecordBatchReader)
cdef CAsyncRecordBatchGenerator* p = new CAsyncRecordBatchGenerator()
p.schema = gen.schema
p.device_type = gen.device_type
p.generator = move(gen.generator)
reader.generator.reset(p)
reader._schema = None
return reader
cdef class AsyncRecordBatchReader(_Weakrefable):
"""Asynchronous reader for a stream of record batches.
This class provides an async iterator interface for consuming record
batches from an asynchronous device stream.
This interface is EXPERIMENTAL.
Examples
--------
>>> async for batch in reader: # doctest: +SKIP
... process(batch)
"""
def __init__(self):
raise TypeError(
f"Do not call {self.__class__.__name__}'s constructor directly, "
"use factory methods instead.")
@property
def schema(self):
"""
Shared schema of the record batches in the stream.
Returns
-------
Schema
"""
if self._schema is None:
self._schema = pyarrow_wrap_schema(self.generator.get().schema)
return self._schema
def __aiter__(self):
return self
async def __anext__(self):
batch = await self._read_next_async()
if batch is None:
raise StopAsyncIteration
return batch
async def _read_next_async(self):
call = _AsyncioCall()
self._read_next(call)
return await call.as_awaitable()
cdef _read_next(self, call):
cdef CFuture[CRecordBatchWithMetadata] c_future
with nogil:
c_future = CallAsyncGenerator(self.generator.get().generator)
BindFuture(move(c_future), call.wakeup, _wrap_record_batch_or_none)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def _test_roundtrip_async(schema, batches, queue_size=5):
"""Test helper: create an async producer+consumer pair and return reader.
EXPERIMENTAL: This function is intended for testing purposes only.
Parameters
----------
schema : Schema
The schema of the record batches.
batches : list of RecordBatch
The record batches to produce.
queue_size : int, default 5
Number of batches to request ahead.
Returns
-------
AsyncRecordBatchReader
"""
call = _AsyncioCall()
_start_roundtrip(call, schema, batches, queue_size)
return await call.as_awaitable()
cdef _start_roundtrip(call, Schema schema, list batches, uint64_t queue_size):
cdef:
shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
vector[shared_ptr[CRecordBatch]] c_batches
CFuture[CAsyncRecordBatchGenerator] c_future
for batch in batches:
c_batches.push_back((<RecordBatch?>batch).sp_batch)
with nogil:
c_future = RoundtripAsyncBatches(
c_schema, move(c_batches), GetCpuThreadPool(), queue_size)
BindFuture(move(c_future), call.wakeup, _wrap_async_generator)