Implementing Long-Running HTTP Connections in Phoenix (override Cowboy's idle_timeout)
November 15, 2023 – Leon Schuermann – #elixirThe Elixir language's actor programming model and the Phoenix web framework are particularly well-suited to implement long-running streaming HTTP connections, such as used in Server-Sent Events (SSE). For example, this blog post on Server-Sent Events with Elixir by Krister Viirsaar succinctly demonstrates how an SSE endpoint can be implemented in Phoenix without using any external libraries. However, the Cowboy HTTP server terminates idle connections after a globally-configured idle timeout, which is only reset when new data is received by clients. Also, SSE streams should send keep-alive messages regularly to ensure that clients, reverse-proxies, and middle-boxes don't close such connections. In this post, I will extend a minimal Phoenix SSE request endpoint example by adding keep-alive messages and overriding the Cowboy HTTP server idle timeout.
Table of Contents
1. Minimal Server-Sent Events Endpoint Example
We start by looking at the Server-Sent Events endpoint example proposed by
Krister Virsaar. First, declare a new sse
MIME-type in your Phoenix
application's config/config.exs
. Without this endpoint, Phoenix will respond
with a 406 Not Acceptable
error to a request that specifies an Accept:
text/event-stream
header, as defined for Server-Sent Events requests:
# Accept event-stream requests config :mime, :types, %{ "text/event-stream" => ["sse"] }
Now, in our router.ex
we can define a new Plug pipeline which expects this
Accept
header, and add an endpoint that dispatches to our SSE request handler:
defmodule MyAppWeb.Router do use MyAppWeb, :router pipeline :sse do plug :accepts, ["sse"] end scope "/sse", MyAppWeb do pipe_through :sse get "/", MySSEController, :sse_req end end
And implement an appropriate event handler which sets the appropriate headers, and dispatches to a (recursive) loop that waits on incoming messages and translates them into chunks sent over the open HTTP connection.
defmodule MyAppWeb.MySSEController do use MyAppWeb, :controller def sse_req(conn, _params) do conn = conn |> put_resp_header("Cache-Control", "no-cache") |> put_resp_header("Connection", "keep-alive") |> put_resp_header("Content-Type", "text/event-stream; charset=utf-8") |> send_chunked(200) sse_loop(conn) end defp sse_loop(conn) do receive do {:my_sse_message, msg} -> chunk(conn, "event: message\ndata: #{Jason.encode! msg}\n\n") sse_loop(conn) _other -> # Ignore all other messages: sse_loop(conn) end end end
This is a minimal working example for establishing an SSE endpoint, utilizing
Phoenix' send_chunked(status)
response. The above
example will not actually send any messages: ultimately, it would need to be
extended such that the connection handler process subscribes to some events,
such as through the Phoenix PubSub
module (as
demonstrated in Krister Viirsaar' post). However, it should nonetheless simply
keep an incoming HTTP request connection open forever, making it possible to
stream data to the client.
When we issue a request to this endpoint (e.g. via cURL), we see the following log output:
[info] GET /sse [debug] Processing with MyAppWeb.MySSEController.sse_req/2 Parameters: %{} Pipelines: [:sse] [info] Chunked 200 in 3ms
Great, our request handler matched and we're ready to stream (no) messages to
the client. However, when waiting a bit, our cURL process exits with an error
(18) transfer closed with outstanding read data remaining
. That's not good…
> time curl -v http://localhost:4000/sse * Trying 127.0.0.1:4000... * Connected to localhost (127.0.0.1) port 4000 (#0) > GET /sse HTTP/1.1 > Host: localhost:4000 > User-Agent: curl/8.1.1 > Accept: */* > < HTTP/1.1 200 OK < Cache-Control: no-cache < Connection: keep-alive < Content-Type: text/event-stream; charset=utf-8 < cache-control: max-age=0, private, must-revalidate < date: Wed, 15 Nov 2023 15:54:11 GMT < server: Cowboy < transfer-encoding: chunked < x-request-id: F5e4D9BN8eDi_fgAAAsh < * transfer closed with outstanding read data remaining * Closing connection 0 curl: (18) transfer closed with outstanding read data remaining ________________________________________________________ Executed in 59.44 secs fish external usr time 6.82 millis 850.00 micros 5.97 millis sys time 9.33 millis 968.00 micros 8.36 millis
2. Implementing SSE Keep-Alive Messages
It's common that web-servers, reverse-proxies and other middle-boxes can
terminate long-running and idle HTTP / TCP connections. Thus it is recommended
to send SSE keep-alive messages every ~15 seconds or so, either unconditionally
or when no data has been sent for some time. We can make use of Elixir's
Process.send_after
for this. We extend our request
handler as follows:
defmodule MyAppWeb.MySSEController do use MyAppWeb, :controller @sse_keepalive_timeout 15000 def sse_req(conn, _params) do conn = conn |> put_resp_header("Cache-Control", "no-cache") |> put_resp_header("Connection", "keep-alive") |> put_resp_header("Content-Type", "text/event-stream; charset=utf-8") |> send_chunked(200) timer_ref = Process.send_after(self(), :sse_keepalive, @sse_keepalive_timeout) sse_loop(conn, timer_ref) end defp sse_loop(conn, timer_ref) do receive do {:my_sse_message, msg} -> Process.cancel_timer timer_ref timer_ref = Process.send_after(self(), :sse_keepalive, @sse_keepalive_timeout) chunk(conn, "event: message\ndata: #{Jason.encode! msg}\n\n") sse_loop(conn, timer_ref) :sse_keepalive -> timer_ref = Process.send_after(self(), :sse_keepalive, @sse_keepalive_timeout) chunk(conn, ":keepalive\n\n") sse_loop(conn, timer_ref) _other -> # Ignore all other messages: sse_loop(conn, timer_ref) end end end
In Server-Sent Events, every line starting with a colon character (:
) is
considered a comment and can thus be used to implement a keep-alive
mechanism. Instead of sending :keepalive
, it would also be sufficient to send
just a colon. Thus we extended our request handler to
- arm a timer before entering the SSE loop,
- reset the timer every time a proper message is sent (
Process.cancel_timer
is safe to call even on already expired timers), - and finally send a keep-alive message and re-arm the timer upon receiving a timeout message.
Unfortunately, we can see that this is not yet sufficient to stop Cowboy (Phoenix's underlying HTTP server) to not terminate our connection:
> time curl -v http://localhost:4000/sse * Trying 127.0.0.1:4000... * Connected to localhost (127.0.0.1) port 4000 (#0) > GET /sse HTTP/1.1 > Host: localhost:4000 > User-Agent: curl/8.1.1 > Accept: */* > < HTTP/1.1 200 OK < Cache-Control: no-cache < Connection: keep-alive < Content-Type: text/event-stream; charset=utf-8 < cache-control: max-age=0, private, must-revalidate < date: Wed, 15 Nov 2023 16:02:57 GMT < server: Cowboy < transfer-encoding: chunked < x-request-id: F5e4i4BVep9mxHQAABNk < :keepalive :keepalive :keepalive * transfer closed with outstanding read data remaining * Closing connection 0 curl: (18) transfer closed with outstanding read data remaining ________________________________________________________ Executed in 59.44 secs fish external usr time 12.65 millis 1.90 millis 10.75 millis sys time 10.75 millis 0.00 millis 10.75 millis
3. Disabling Cowboy's Idle Timeout for a Single Connection
Even with our keep-alive messages, Cowboy still terminates the connection after 60 seconds. This is because Cowboy determines a connection to be active solely by whether the server receives data from the client, and does not take into account any data sent by the server to the client. Naturally, the latter is the exact purpose of SSE, and existing client libraries will generally not send data to the server after they made their initial request.
The fact that Cowboy, Phoenix' underlying HTTP server, terminates idle
connections, and its exact definition of what it means for a connection to be
idle, seems to be a common source of confusion and frustration. Virtually all
existing resources on this topic seem to suggest to just increase the global
:idle_timeout
option in the Plug.Cowboy
configuration in your config.exs
like so:
config :myapp, MyAppWeb.Endpoint, http: [ port: 4000, protocol_options: [ # Choose a sensible timeout, or set to :infinity to never kill # idle connections: idle_timeout: 300_000 ] ],
However, this seems less than ideal: presumably, terminating long-standing idle connections can help reduce resource-consumption and prevent unbounded resource-leakage on the server, and thus we'd want Cowboy to terminate most of our connections! We want to make an exception just for a couple long-running connections, and better yet, only after authenticating clients.
Looking at the Cowboy HTTP module documentation, we can see that the
:idle_timeout
configuration be changed "using the
set_options
stream handler command". A promising example for this can be
found in the cowboy_req:cast
function documentation:
cowboy_req:cast({set_options, #{ idle_timeout => 3600000 }}, Req).
Neat! This means that, as long as we can get a hold of the underlying Cowboy
Req
object somehow, we should be able to change this idle timeout on a
per-connection granularity, dynamically within the request handler. Phoenix
provides us only the conn
and params
parameters, so let's inspect the more
promising conn
first, by writing IO.inspect(conn)
in our request handler:
%Plug.Conn{ adapter: {Plug.Cowboy.Conn, :...}, assigns: %{}, body_params: %{}, [...]
Okay, conn.adapter
looks interesting. Let's inspect
that tuple specifically:
{Plug.Cowboy.Conn, %{ bindings: %{}, body_length: 0, cert: :undefined, has_body: false, headers: %{
It looks like conn.adapter
contains a tuple of
{Plug.Cowboy.Conn, cowboy_req}
, where
cowboy_req
is the Cowboy Req
object! We can try to
issue the cowboy_req:cast
call from the Cowboy
documentation, by passing a request timeout of :infinity
:
def sse_req(conn, _params) do {Plug.Cowboy.Conn, cowboy_req} = conn.adapter :cowboy_req.cast({:set_options, %{ idle_timeout: :infinity }}, cowboy_req)
And re-running our cURL request now gives us:
> time curl -v http://localhost:4000/sse * Trying 127.0.0.1:4000... * Connected to localhost (127.0.0.1) port 4000 (#0) > GET /sse HTTP/1.1 > Host: localhost:4000 > User-Agent: curl/8.1.1 > Accept: */* > < HTTP/1.1 200 OK < Cache-Control: no-cache < Connection: keep-alive < Content-Type: text/event-stream; charset=utf-8 < cache-control: max-age=0, private, must-revalidate < date: Wed, 15 Nov 2023 16:31:10 GMT < server: Cowboy < transfer-encoding: chunked < x-request-id: F5fY5wXDXoT_TToAAABB < :keepalive :keepalive :keepalive :keepalive :keepalive :keepalive [...]
Sweet! With this, we can control Cowboy's idle timeout for each
connection. Furthermore, by moving the :cowboy_req:cast
invocation after authentication checks have occurred, we
can still subject all unauthenticated request to the default global idle
timeout.
As far as I know, the Plug
library is designed to be
usable with different HTTP server implementations, and Cowboy is just one of
them. Supposedly, this means that the {Plug.Cowboy.Conn, cowboy_req} = conn.adapter
match may raise an
exception, if a different HTTP server is used. For my purposes, I deem this
acceptable – this endpoint has not been tested with any other HTTP server, and
should thus fail when not using Cowboy. Depending on your constraints, you may
instead choose to put the timeout-setting logic behind a case
match guard to more gracefully handle this case.