Skip to content

Commit 688f350

Browse files
committed
feat(execution): support returning AsyncIterableIterator from execute
1 parent 65e073d commit 688f350

File tree

2 files changed

+166
-69
lines changed

2 files changed

+166
-69
lines changed

src/server.ts

Lines changed: 85 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ export interface ServerOptions {
7979
* execute the subscription operation
8080
* upon.
8181
*/
82-
execute: (args: ExecutionArgs) => Promise<ExecutionResult> | ExecutionResult;
82+
execute: (
83+
args: ExecutionArgs,
84+
) =>
85+
| Promise<ExecutionResult>
86+
| ExecutionResult
87+
| AsyncIterableIterator<ExecutionResult>;
8388
/**
8489
* Is the `subscribe` function
8590
* from GraphQL which is used to
@@ -457,59 +462,65 @@ export function createServer(
457462
});
458463
}
459464

460-
// perform
461-
if (operationAST.operation === 'subscription') {
462-
const subscriptionOrResult = await subscribe(execArgs);
463-
if (isAsyncIterable(subscriptionOrResult)) {
464-
// iterable subscriptions are distinct on ID
465-
if (ctx.subscriptions[message.id]) {
466-
return ctx.socket.close(
467-
4409,
468-
`Subscriber for ${message.id} already exists`,
469-
);
470-
}
471-
ctx.subscriptions[message.id] = subscriptionOrResult;
472-
473-
try {
474-
for await (let result of subscriptionOrResult) {
475-
// use the root formater first
476-
if (formatExecutionResult) {
477-
result = await formatExecutionResult(ctx, result);
478-
}
479-
// then use the subscription specific formatter
480-
if (onSubscribeFormatter) {
481-
result = await onSubscribeFormatter(ctx, result);
482-
}
483-
await sendMessage<MessageType.Next>(ctx, {
484-
id: message.id,
485-
type: MessageType.Next,
486-
payload: result,
487-
});
488-
}
465+
const asyncIterableHandler = async (
466+
asyncIterable: AsyncIterableIterator<ExecutionResult>,
467+
) => {
468+
// iterable subscriptions are distinct on ID
469+
if (ctx.subscriptions[message.id]) {
470+
return ctx.socket.close(
471+
4409,
472+
`Subscriber for ${message.id} already exists`,
473+
);
474+
}
475+
ctx.subscriptions[message.id] = asyncIterable;
489476

490-
const completeMessage: CompleteMessage = {
491-
id: message.id,
492-
type: MessageType.Complete,
493-
};
494-
await sendMessage<MessageType.Complete>(ctx, completeMessage);
495-
if (onComplete) {
496-
onComplete(ctx, completeMessage);
477+
try {
478+
for await (let result of asyncIterable) {
479+
// use the root formater first
480+
if (formatExecutionResult) {
481+
result = await formatExecutionResult(ctx, result);
497482
}
498-
} catch (err) {
499-
await sendMessage<MessageType.Error>(ctx, {
483+
// then use the subscription specific formatter
484+
if (onSubscribeFormatter) {
485+
result = await onSubscribeFormatter(ctx, result);
486+
}
487+
await sendMessage<MessageType.Next>(ctx, {
500488
id: message.id,
501-
type: MessageType.Error,
502-
payload: [
503-
new GraphQLError(
504-
err instanceof Error
505-
? err.message
506-
: new Error(err).message,
507-
),
508-
],
489+
type: MessageType.Next,
490+
payload: result,
509491
});
510-
} finally {
511-
delete ctx.subscriptions[message.id];
512492
}
493+
494+
const completeMessage: CompleteMessage = {
495+
id: message.id,
496+
type: MessageType.Complete,
497+
};
498+
await sendMessage<MessageType.Complete>(ctx, completeMessage);
499+
if (onComplete) {
500+
onComplete(ctx, completeMessage);
501+
}
502+
} catch (err) {
503+
await sendMessage<MessageType.Error>(ctx, {
504+
id: message.id,
505+
type: MessageType.Error,
506+
payload: [
507+
new GraphQLError(
508+
err instanceof Error
509+
? err.message
510+
: new Error(err).message,
511+
),
512+
],
513+
});
514+
} finally {
515+
delete ctx.subscriptions[message.id];
516+
}
517+
};
518+
519+
// perform
520+
if (operationAST.operation === 'subscription') {
521+
const subscriptionOrResult = await subscribe(execArgs);
522+
if (isAsyncIterable(subscriptionOrResult)) {
523+
await asyncIterableHandler(subscriptionOrResult);
513524
} else {
514525
let result = subscriptionOrResult;
515526
// use the root formater first
@@ -539,27 +550,32 @@ export function createServer(
539550
// operationAST.operation === 'query' || 'mutation'
540551

541552
let result = await execute(execArgs);
542-
// use the root formater first
543-
if (formatExecutionResult) {
544-
result = await formatExecutionResult(ctx, result);
545-
}
546-
// then use the subscription specific formatter
547-
if (onSubscribeFormatter) {
548-
result = await onSubscribeFormatter(ctx, result);
549-
}
550-
await sendMessage<MessageType.Next>(ctx, {
551-
id: message.id,
552-
type: MessageType.Next,
553-
payload: result,
554-
});
555553

556-
const completeMessage: CompleteMessage = {
557-
id: message.id,
558-
type: MessageType.Complete,
559-
};
560-
await sendMessage<MessageType.Complete>(ctx, completeMessage);
561-
if (onComplete) {
562-
onComplete(ctx, completeMessage);
554+
if (isAsyncIterable(result)) {
555+
await asyncIterableHandler(result);
556+
} else {
557+
// use the root formater first
558+
if (formatExecutionResult) {
559+
result = await formatExecutionResult(ctx, result);
560+
}
561+
// then use the subscription specific formatter
562+
if (onSubscribeFormatter) {
563+
result = await onSubscribeFormatter(ctx, result);
564+
}
565+
await sendMessage<MessageType.Next>(ctx, {
566+
id: message.id,
567+
type: MessageType.Next,
568+
payload: result,
569+
});
570+
571+
const completeMessage: CompleteMessage = {
572+
id: message.id,
573+
type: MessageType.Complete,
574+
};
575+
await sendMessage<MessageType.Complete>(ctx, completeMessage);
576+
if (onComplete) {
577+
onComplete(ctx, completeMessage);
578+
}
563579
}
564580
}
565581
break;

src/tests/server.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,87 @@ describe('Subscribe', () => {
559559
await wait(20);
560560
});
561561

562+
it('should execute a query operation with custom execute that returns a AsyncIterableIterator, "next" the results and then "complete"', async () => {
563+
expect.assertions(5);
564+
565+
await makeServer({
566+
schema,
567+
execute: async function* () {
568+
for (const value of ['Hi', 'Hello', 'Sup']) {
569+
yield {
570+
data: {
571+
getValue: value,
572+
},
573+
};
574+
}
575+
},
576+
});
577+
578+
const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL);
579+
client.onopen = () => {
580+
client.send(
581+
stringifyMessage<MessageType.ConnectionInit>({
582+
type: MessageType.ConnectionInit,
583+
}),
584+
);
585+
};
586+
587+
let receivedNextCount = 0;
588+
client.onmessage = ({ data }) => {
589+
const message = parseMessage(data);
590+
switch (message.type) {
591+
case MessageType.ConnectionAck:
592+
client.send(
593+
stringifyMessage<MessageType.Subscribe>({
594+
id: '1',
595+
type: MessageType.Subscribe,
596+
payload: {
597+
operationName: 'TestString',
598+
query: `query TestString {
599+
getValue
600+
}`,
601+
variables: {},
602+
},
603+
}),
604+
);
605+
break;
606+
case MessageType.Next:
607+
receivedNextCount++;
608+
if (receivedNextCount === 1) {
609+
expect(message).toEqual({
610+
id: '1',
611+
type: MessageType.Next,
612+
payload: { data: { getValue: 'Hi' } },
613+
});
614+
} else if (receivedNextCount === 2) {
615+
expect(message).toEqual({
616+
id: '1',
617+
type: MessageType.Next,
618+
payload: { data: { getValue: 'Hello' } },
619+
});
620+
} else if (receivedNextCount === 3) {
621+
expect(message).toEqual({
622+
id: '1',
623+
type: MessageType.Next,
624+
payload: { data: { getValue: 'Sup' } },
625+
});
626+
}
627+
break;
628+
case MessageType.Complete:
629+
expect(receivedNextCount).toEqual(3);
630+
expect(message).toEqual({
631+
id: '1',
632+
type: MessageType.Complete,
633+
});
634+
break;
635+
default:
636+
fail(`Not supposed to receive a message of type ${message.type}`);
637+
}
638+
};
639+
640+
await wait(20);
641+
});
642+
562643
it('should execute the query of `DocumentNode` type, "next" the result and then "complete"', async () => {
563644
expect.assertions(3);
564645

0 commit comments

Comments
 (0)