Skip to content

Commit 36f360d

Browse files
committed
feat(execution): support returning AsyncIterableIterator from execute
1 parent 65e073d commit 36f360d

2 files changed

Lines changed: 164 additions & 69 deletions

File tree

src/server.ts

Lines changed: 85 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ export interface ServerOptions {
7979
* execute the subscription operation
8080
* upon.
8181
*/
82-
execute: (args: ExecutionArgs) => Promise<ExecutionResult> | ExecutionResult;
82+
execute: (
83+
args: ExecutionArgs,
84+
) =>
85+
| Promise<ExecutionResult>
86+
| ExecutionResult
87+
| AsyncIterableIterator<ExecutionResult>;
8388
/**
8489
* Is the `subscribe` function
8590
* from GraphQL which is used to
@@ -457,59 +462,65 @@ export function createServer(
457462
});
458463
}
459464

460-
// perform
461-
if (operationAST.operation === 'subscription') {
462-
const subscriptionOrResult = await subscribe(execArgs);
463-
if (isAsyncIterable(subscriptionOrResult)) {
464-
// iterable subscriptions are distinct on ID
465-
if (ctx.subscriptions[message.id]) {
466-
return ctx.socket.close(
467-
4409,
468-
`Subscriber for ${message.id} already exists`,
469-
);
470-
}
471-
ctx.subscriptions[message.id] = subscriptionOrResult;
472-
473-
try {
474-
for await (let result of subscriptionOrResult) {
475-
// use the root formater first
476-
if (formatExecutionResult) {
477-
result = await formatExecutionResult(ctx, result);
478-
}
479-
// then use the subscription specific formatter
480-
if (onSubscribeFormatter) {
481-
result = await onSubscribeFormatter(ctx, result);
482-
}
483-
await sendMessage<MessageType.Next>(ctx, {
484-
id: message.id,
485-
type: MessageType.Next,
486-
payload: result,
487-
});
488-
}
465+
const asyncIterableHandler = async (
466+
asyncIterable: AsyncIterableIterator<ExecutionResult>,
467+
) => {
468+
// iterable subscriptions are distinct on ID
469+
if (ctx.subscriptions[message.id]) {
470+
return ctx.socket.close(
471+
4409,
472+
`Subscriber for ${message.id} already exists`,
473+
);
474+
}
475+
ctx.subscriptions[message.id] = asyncIterable;
489476

490-
const completeMessage: CompleteMessage = {
491-
id: message.id,
492-
type: MessageType.Complete,
493-
};
494-
await sendMessage<MessageType.Complete>(ctx, completeMessage);
495-
if (onComplete) {
496-
onComplete(ctx, completeMessage);
477+
try {
478+
for await (let result of asyncIterable) {
479+
// use the root formater first
480+
if (formatExecutionResult) {
481+
result = await formatExecutionResult(ctx, result);
497482
}
498-
} catch (err) {
499-
await sendMessage<MessageType.Error>(ctx, {
483+
// then use the subscription specific formatter
484+
if (onSubscribeFormatter) {
485+
result = await onSubscribeFormatter(ctx, result);
486+
}
487+
await sendMessage<MessageType.Next>(ctx, {
500488
id: message.id,
501-
type: MessageType.Error,
502-
payload: [
503-
new GraphQLError(
504-
err instanceof Error
505-
? err.message
506-
: new Error(err).message,
507-
),
508-
],
489+
type: MessageType.Next,
490+
payload: result,
509491
});
510-
} finally {
511-
delete ctx.subscriptions[message.id];
512492
}
493+
494+
const completeMessage: CompleteMessage = {
495+
id: message.id,
496+
type: MessageType.Complete,
497+
};
498+
await sendMessage<MessageType.Complete>(ctx, completeMessage);
499+
if (onComplete) {
500+
onComplete(ctx, completeMessage);
501+
}
502+
} catch (err) {
503+
await sendMessage<MessageType.Error>(ctx, {
504+
id: message.id,
505+
type: MessageType.Error,
506+
payload: [
507+
new GraphQLError(
508+
err instanceof Error
509+
? err.message
510+
: new Error(err).message,
511+
),
512+
],
513+
});
514+
} finally {
515+
delete ctx.subscriptions[message.id];
516+
}
517+
};
518+
519+
// perform
520+
if (operationAST.operation === 'subscription') {
521+
const subscriptionOrResult = await subscribe(execArgs);
522+
if (isAsyncIterable(subscriptionOrResult)) {
523+
await asyncIterableHandler(subscriptionOrResult);
513524
} else {
514525
let result = subscriptionOrResult;
515526
// use the root formater first
@@ -539,27 +550,32 @@ export function createServer(
539550
// operationAST.operation === 'query' || 'mutation'
540551

541552
let result = await execute(execArgs);
542-
// use the root formater first
543-
if (formatExecutionResult) {
544-
result = await formatExecutionResult(ctx, result);
545-
}
546-
// then use the subscription specific formatter
547-
if (onSubscribeFormatter) {
548-
result = await onSubscribeFormatter(ctx, result);
549-
}
550-
await sendMessage<MessageType.Next>(ctx, {
551-
id: message.id,
552-
type: MessageType.Next,
553-
payload: result,
554-
});
555553

556-
const completeMessage: CompleteMessage = {
557-
id: message.id,
558-
type: MessageType.Complete,
559-
};
560-
await sendMessage<MessageType.Complete>(ctx, completeMessage);
561-
if (onComplete) {
562-
onComplete(ctx, completeMessage);
554+
if (isAsyncIterable(result)) {
555+
await asyncIterableHandler(result);
556+
} else {
557+
// use the root formater first
558+
if (formatExecutionResult) {
559+
result = await formatExecutionResult(ctx, result);
560+
}
561+
// then use the subscription specific formatter
562+
if (onSubscribeFormatter) {
563+
result = await onSubscribeFormatter(ctx, result);
564+
}
565+
await sendMessage<MessageType.Next>(ctx, {
566+
id: message.id,
567+
type: MessageType.Next,
568+
payload: result,
569+
});
570+
571+
const completeMessage: CompleteMessage = {
572+
id: message.id,
573+
type: MessageType.Complete,
574+
};
575+
await sendMessage<MessageType.Complete>(ctx, completeMessage);
576+
if (onComplete) {
577+
onComplete(ctx, completeMessage);
578+
}
563579
}
564580
}
565581
break;

src/tests/server.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,85 @@ describe('Subscribe', () => {
559559
await wait(20);
560560
});
561561

562+
it('should execute a query operation with custom execute that returns a AsyncIterableIterator, "next" the results and then "complete"', async () => {
563+
await makeServer({
564+
schema,
565+
execute: async function* () {
566+
for (const value of ['Hi', 'Hello', 'Sup']) {
567+
yield {
568+
data: {
569+
getValue: value,
570+
},
571+
};
572+
}
573+
},
574+
});
575+
576+
const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL);
577+
client.onopen = () => {
578+
client.send(
579+
stringifyMessage<MessageType.ConnectionInit>({
580+
type: MessageType.ConnectionInit,
581+
}),
582+
);
583+
};
584+
585+
let receivedNextCount = 0;
586+
client.onmessage = ({ data }) => {
587+
const message = parseMessage(data);
588+
switch (message.type) {
589+
case MessageType.ConnectionAck:
590+
client.send(
591+
stringifyMessage<MessageType.Subscribe>({
592+
id: '1',
593+
type: MessageType.Subscribe,
594+
payload: {
595+
operationName: 'TestString',
596+
query: `query TestString {
597+
getValue
598+
}`,
599+
variables: {},
600+
},
601+
}),
602+
);
603+
break;
604+
case MessageType.Next:
605+
receivedNextCount++;
606+
if (receivedNextCount === 1) {
607+
expect(message).toEqual({
608+
id: '1',
609+
type: MessageType.Next,
610+
payload: { data: { getValue: 'Hi' } },
611+
});
612+
} else if (receivedNextCount === 2) {
613+
expect(message).toEqual({
614+
id: '1',
615+
type: MessageType.Next,
616+
payload: { data: { getValue: 'Hello' } },
617+
});
618+
} else if (receivedNextCount === 3) {
619+
expect(message).toEqual({
620+
id: '1',
621+
type: MessageType.Next,
622+
payload: { data: { getValue: 'Sup' } },
623+
});
624+
}
625+
break;
626+
case MessageType.Complete:
627+
expect(receivedNextCount).toEqual(3);
628+
expect(message).toEqual({
629+
id: '1',
630+
type: MessageType.Complete,
631+
});
632+
break;
633+
default:
634+
fail(`Not supposed to receive a message of type ${message.type}`);
635+
}
636+
};
637+
638+
await wait(20);
639+
});
640+
562641
it('should execute the query of `DocumentNode` type, "next" the result and then "complete"', async () => {
563642
expect.assertions(3);
564643

0 commit comments

Comments
 (0)