From a978df18845b5bfd001f7ef175900b735f5c6bb6 Mon Sep 17 00:00:00 2001 From: Bruno Tremblay Date: Mon, 14 Dec 2020 21:29:43 -0500 Subject: [PATCH 1/4] ws experiment --- R/plumber.R | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/R/plumber.R b/R/plumber.R index 7b3e43b50..6e454ab10 100644 --- a/R/plumber.R +++ b/R/plumber.R @@ -755,7 +755,10 @@ Plumber <- R6Class( #' @description httpuv interface onWSOpen function. (Required for \pkg{httpuv}) #' @param ws WebSocket object onWSOpen = function(ws){ - warning("WebSockets not supported.") + if (!is.null(private$ws_open)) { + private$ws_open(ws) + } + invisible(self) }, #' @description Sets the default serializer of the router. #' @@ -923,7 +926,12 @@ Plumber <- R6Class( ret }, - + #' @description Assign functions to websocket methods + #' @param open on open websocket method + websocket = function(open = NULL) { + if (!is.null(open)) stopifnot(is.function(open)) + private$ws_open <- open + }, ### Legacy/Deprecated #' @description addEndpoint has been deprecated in v0.4.0 and will be removed in a coming release. Please use `handle()` instead. @@ -1074,6 +1082,7 @@ Plumber <- R6Class( docs_info = NULL, docs_callback = NULL, debug = NULL, + ws_open = NULL, addFilterInternal = function(filter){ # Create a new filter and add it to the router From 92ee48534e020c23745bab76dc532b5818700f32 Mon Sep 17 00:00:00 2001 From: Bruno Tremblay Date: Tue, 15 Dec 2020 01:09:01 -0500 Subject: [PATCH 2/4] adding a default websocket open helper --- DESCRIPTION | 1 + R/plumber.R | 3 ++- R/websocket.R | 28 ++++++++++++++++++++++++++++ man/Plumber.Rd | 18 ++++++++++++++++++ man/PlumberStatic.Rd | 1 + man/deprecated_r6.Rd | 1 + 6 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 R/websocket.R diff --git a/DESCRIPTION b/DESCRIPTION index 627a28b68..64e731d28 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -84,5 +84,6 @@ Collate: 'utils-pipe.R' 'utils.R' 'validate_api_spec.R' + 'websocket.R' 'zzz.R' RdMacros: lifecycle diff --git a/R/plumber.R b/R/plumber.R index 6e454ab10..a38cfdd95 100644 --- a/R/plumber.R +++ b/R/plumber.R @@ -96,6 +96,7 @@ Plumber <- R6Class( self$setDocsCallback(getOption('plumber.docs.callback', getOption('plumber.swagger.url', NULL))) self$setDebug(interactive()) self$setApiSpec(NULL) + self$websocket(defaultWebsocket(self, private$default_serializer)) # Add in the initial filters for (fn in names(filters)){ @@ -926,7 +927,7 @@ Plumber <- R6Class( ret }, - #' @description Assign functions to websocket methods + #' @description Set websocket open method #' @param open on open websocket method websocket = function(open = NULL) { if (!is.null(open)) stopifnot(is.function(open)) diff --git a/R/websocket.R b/R/websocket.R new file mode 100644 index 000000000..89071b9bb --- /dev/null +++ b/R/websocket.R @@ -0,0 +1,28 @@ +#' @noRd +defaultWebsocket <- function(pr, ser) { + function(ws) { + ws$onMessage(function(binary, message) { + req <- ws$request + req$ws <- ws + req$pr <- pr + req$.internal <- new.env() + req$args <- list() + req$bodyRaw <- message + delayedAssign( + "postBody", + { + if (binary) rawToChar(message) else message + }, + assign.env = req + ) + req$.internal$bodyHandled <- TRUE + res <- PlumberResponse$new(ser) + pr$serve(req, res) + if (res$status == "200") { + ws$send(res$body) + } else { + ws$send(paste(res$status, res$body)) + } + }) + } +} diff --git a/man/Plumber.Rd b/man/Plumber.Rd index 585a6d364..46ce1110a 100644 --- a/man/Plumber.Rd +++ b/man/Plumber.Rd @@ -158,6 +158,7 @@ pr$setErrorHandler(function(req, res, err) { \item \href{#method-filter}{\code{Plumber$filter()}} \item \href{#method-setApiSpec}{\code{Plumber$setApiSpec()}} \item \href{#method-getApiSpec}{\code{Plumber$getApiSpec()}} +\item \href{#method-websocket}{\code{Plumber$websocket()}} \item \href{#method-addEndpoint}{\code{Plumber$addEndpoint()}} \item \href{#method-addAssets}{\code{Plumber$addAssets()}} \item \href{#method-addFilter}{\code{Plumber$addFilter()}} @@ -845,6 +846,23 @@ Retrieve openAPI file \if{html}{\out{
}}\preformatted{Plumber$getApiSpec()}\if{html}{\out{
}} } +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-websocket}{}}} +\subsection{Method \code{websocket()}}{ +Set websocket open method +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Plumber$websocket(open = NULL)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{open}}{on open websocket method} +} +\if{html}{\out{
}} +} } \if{html}{\out{
}} \if{html}{\out{}} diff --git a/man/PlumberStatic.Rd b/man/PlumberStatic.Rd index 1b2bd2611..622cda49e 100644 --- a/man/PlumberStatic.Rd +++ b/man/PlumberStatic.Rd @@ -54,6 +54,7 @@ Creates a router that is backed by a directory of files on disk. \item \out{}\href{../../plumber/html/Plumber.html#method-setSerializer}{\code{plumber::Plumber$setSerializer()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-swaggerFile}{\code{plumber::Plumber$swaggerFile()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-unmount}{\code{plumber::Plumber$unmount()}}\out{} +\item \out{}\href{../../plumber/html/Plumber.html#method-websocket}{\code{plumber::Plumber$websocket()}}\out{} } \out{} } diff --git a/man/deprecated_r6.Rd b/man/deprecated_r6.Rd index c0e4ec305..88cce2dfa 100644 --- a/man/deprecated_r6.Rd +++ b/man/deprecated_r6.Rd @@ -110,6 +110,7 @@ The objects of this class are cloneable with this method. \item \out{}\href{../../plumber/html/Plumber.html#method-setSerializer}{\code{plumber::Plumber$setSerializer()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-swaggerFile}{\code{plumber::Plumber$swaggerFile()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-unmount}{\code{plumber::Plumber$unmount()}}\out{} +\item \out{}\href{../../plumber/html/Plumber.html#method-websocket}{\code{plumber::Plumber$websocket()}}\out{} } \out{} } From 29c11405c514726cb8350d5c65336713da2cfb92 Mon Sep 17 00:00:00 2001 From: Bruno Tremblay Date: Tue, 15 Dec 2020 01:25:11 -0500 Subject: [PATCH 3/4] assignation can be done onopen, they should remain the same through the websocket session --- R/websocket.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/websocket.R b/R/websocket.R index 89071b9bb..fbafa00df 100644 --- a/R/websocket.R +++ b/R/websocket.R @@ -1,10 +1,10 @@ #' @noRd defaultWebsocket <- function(pr, ser) { function(ws) { + req <- ws$request + req$ws <- ws + req$pr <- pr ws$onMessage(function(binary, message) { - req <- ws$request - req$ws <- ws - req$pr <- pr req$.internal <- new.env() req$args <- list() req$bodyRaw <- message From 77a324ba223722c7980dc52827eaa09a06f3c6bc Mon Sep 17 00:00:00 2001 From: Bruno Tremblay Date: Tue, 15 Dec 2020 01:41:39 -0500 Subject: [PATCH 4/4] structured messaging for return messaging --- R/websocket.R | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/R/websocket.R b/R/websocket.R index fbafa00df..9cb21297f 100644 --- a/R/websocket.R +++ b/R/websocket.R @@ -18,11 +18,10 @@ defaultWebsocket <- function(pr, ser) { req$.internal$bodyHandled <- TRUE res <- PlumberResponse$new(ser) pr$serve(req, res) - if (res$status == "200") { - ws$send(res$body) - } else { - ws$send(paste(res$status, res$body)) - } + ws$send(paste("_status_", res$status)) + ws$send(paste("_headers_", paste(names(res$headers), unlist(res$headers), sep = "=", collapse = ";"))) + ws$send("_body_ nextmessage") + ws$send(res$body) }) } }