diff --git a/DESCRIPTION b/DESCRIPTION index 627a28b68..64e731d28 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -84,5 +84,6 @@ Collate: 'utils-pipe.R' 'utils.R' 'validate_api_spec.R' + 'websocket.R' 'zzz.R' RdMacros: lifecycle diff --git a/R/plumber.R b/R/plumber.R index 7b3e43b50..a38cfdd95 100644 --- a/R/plumber.R +++ b/R/plumber.R @@ -96,6 +96,7 @@ Plumber <- R6Class( self$setDocsCallback(getOption('plumber.docs.callback', getOption('plumber.swagger.url', NULL))) self$setDebug(interactive()) self$setApiSpec(NULL) + self$websocket(defaultWebsocket(self, private$default_serializer)) # Add in the initial filters for (fn in names(filters)){ @@ -755,7 +756,10 @@ Plumber <- R6Class( #' @description httpuv interface onWSOpen function. (Required for \pkg{httpuv}) #' @param ws WebSocket object onWSOpen = function(ws){ - warning("WebSockets not supported.") + if (!is.null(private$ws_open)) { + private$ws_open(ws) + } + invisible(self) }, #' @description Sets the default serializer of the router. #' @@ -923,7 +927,12 @@ Plumber <- R6Class( ret }, - + #' @description Set websocket open method + #' @param open on open websocket method + websocket = function(open = NULL) { + if (!is.null(open)) stopifnot(is.function(open)) + private$ws_open <- open + }, ### Legacy/Deprecated #' @description addEndpoint has been deprecated in v0.4.0 and will be removed in a coming release. Please use `handle()` instead. @@ -1074,6 +1083,7 @@ Plumber <- R6Class( docs_info = NULL, docs_callback = NULL, debug = NULL, + ws_open = NULL, addFilterInternal = function(filter){ # Create a new filter and add it to the router diff --git a/R/websocket.R b/R/websocket.R new file mode 100644 index 000000000..9cb21297f --- /dev/null +++ b/R/websocket.R @@ -0,0 +1,27 @@ +#' @noRd +defaultWebsocket <- function(pr, ser) { + function(ws) { + req <- ws$request + req$ws <- ws + req$pr <- pr + ws$onMessage(function(binary, message) { + req$.internal <- new.env() + req$args <- list() + req$bodyRaw <- message + delayedAssign( + "postBody", + { + if (binary) rawToChar(message) else message + }, + assign.env = req + ) + req$.internal$bodyHandled <- TRUE + res <- PlumberResponse$new(ser) + pr$serve(req, res) + ws$send(paste("_status_", res$status)) + ws$send(paste("_headers_", paste(names(res$headers), unlist(res$headers), sep = "=", collapse = ";"))) + ws$send("_body_ nextmessage") + ws$send(res$body) + }) + } +} diff --git a/man/Plumber.Rd b/man/Plumber.Rd index 585a6d364..46ce1110a 100644 --- a/man/Plumber.Rd +++ b/man/Plumber.Rd @@ -158,6 +158,7 @@ pr$setErrorHandler(function(req, res, err) { \item \href{#method-filter}{\code{Plumber$filter()}} \item \href{#method-setApiSpec}{\code{Plumber$setApiSpec()}} \item \href{#method-getApiSpec}{\code{Plumber$getApiSpec()}} +\item \href{#method-websocket}{\code{Plumber$websocket()}} \item \href{#method-addEndpoint}{\code{Plumber$addEndpoint()}} \item \href{#method-addAssets}{\code{Plumber$addAssets()}} \item \href{#method-addFilter}{\code{Plumber$addFilter()}} @@ -845,6 +846,23 @@ Retrieve openAPI file \if{html}{\out{
}}\preformatted{Plumber$getApiSpec()}\if{html}{\out{
}} } +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-websocket}{}}} +\subsection{Method \code{websocket()}}{ +Set websocket open method +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Plumber$websocket(open = NULL)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{open}}{on open websocket method} +} +\if{html}{\out{
}} +} } \if{html}{\out{
}} \if{html}{\out{}} diff --git a/man/PlumberStatic.Rd b/man/PlumberStatic.Rd index 1b2bd2611..622cda49e 100644 --- a/man/PlumberStatic.Rd +++ b/man/PlumberStatic.Rd @@ -54,6 +54,7 @@ Creates a router that is backed by a directory of files on disk. \item \out{}\href{../../plumber/html/Plumber.html#method-setSerializer}{\code{plumber::Plumber$setSerializer()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-swaggerFile}{\code{plumber::Plumber$swaggerFile()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-unmount}{\code{plumber::Plumber$unmount()}}\out{} +\item \out{}\href{../../plumber/html/Plumber.html#method-websocket}{\code{plumber::Plumber$websocket()}}\out{} } \out{} } diff --git a/man/deprecated_r6.Rd b/man/deprecated_r6.Rd index c0e4ec305..88cce2dfa 100644 --- a/man/deprecated_r6.Rd +++ b/man/deprecated_r6.Rd @@ -110,6 +110,7 @@ The objects of this class are cloneable with this method. \item \out{}\href{../../plumber/html/Plumber.html#method-setSerializer}{\code{plumber::Plumber$setSerializer()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-swaggerFile}{\code{plumber::Plumber$swaggerFile()}}\out{} \item \out{}\href{../../plumber/html/Plumber.html#method-unmount}{\code{plumber::Plumber$unmount()}}\out{} +\item \out{}\href{../../plumber/html/Plumber.html#method-websocket}{\code{plumber::Plumber$websocket()}}\out{} } \out{} }