Implementing Long-Running HTTP Connections in Phoenix (override Cowboy's idle_timeout)

November 15, 2023 – Leon Schuermann – #elixir

The Elixir language's actor programming model and the Phoenix web framework are particularly well-suited to implement long-running streaming HTTP connections, such as used in Server-Sent Events (SSE). For example, this blog post on Server-Sent Events with Elixir by Krister Viirsaar succinctly demonstrates how an SSE endpoint can be implemented in Phoenix without using any external libraries. However, the Cowboy HTTP server terminates idle connections after a globally-configured idle timeout, which is only reset when new data is received by clients. Also, SSE streams should send keep-alive messages regularly to ensure that clients, reverse-proxies, and middle-boxes don't close such connections. In this post, I will extend a minimal Phoenix SSE request endpoint example by adding keep-alive messages and overriding the Cowboy HTTP server idle timeout.

Table of Contents

1. Minimal Server-Sent Events Endpoint Example

We start by looking at the Server-Sent Events endpoint example proposed by Krister Virsaar. First, declare a new sse MIME-type in your Phoenix application's config/config.exs. Without this endpoint, Phoenix will respond with a 406 Not Acceptable error to a request that specifies an Accept: text/event-stream header, as defined for Server-Sent Events requests:

# Accept event-stream requests
config :mime, :types, %{
  "text/event-stream" => ["sse"]
}

Now, in our router.ex we can define a new Plug pipeline which expects this Accept header, and add an endpoint that dispatches to our SSE request handler:

defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  pipeline :sse do
    plug :accepts, ["sse"]
  end

  scope "/sse", MyAppWeb do
    pipe_through :sse

    get "/", MySSEController, :sse_req
  end
end

And implement an appropriate event handler which sets the appropriate headers, and dispatches to a (recursive) loop that waits on incoming messages and translates them into chunks sent over the open HTTP connection.

defmodule MyAppWeb.MySSEController do
  use MyAppWeb, :controller

  def sse_req(conn, _params) do
    conn =
      conn
      |> put_resp_header("Cache-Control", "no-cache")
      |> put_resp_header("Connection", "keep-alive")
      |> put_resp_header("Content-Type", "text/event-stream; charset=utf-8")
      |> send_chunked(200)

    sse_loop(conn)
  end

  defp sse_loop(conn) do
    receive do
      {:my_sse_message, msg} ->
        chunk(conn, "event: message\ndata: #{Jason.encode! msg}\n\n")
        sse_loop(conn)

      _other ->
        # Ignore all other messages:
        sse_loop(conn)
    end
  end
end

This is a minimal working example for establishing an SSE endpoint, utilizing Phoenix' send_chunked(status) response. The above example will not actually send any messages: ultimately, it would need to be extended such that the connection handler process subscribes to some events, such as through the Phoenix PubSub module (as demonstrated in Krister Viirsaar' post). However, it should nonetheless simply keep an incoming HTTP request connection open forever, making it possible to stream data to the client.

When we issue a request to this endpoint (e.g. via cURL), we see the following log output:

[info] GET /sse
[debug] Processing with MyAppWeb.MySSEController.sse_req/2
  Parameters: %{}
  Pipelines: [:sse]
[info] Chunked 200 in 3ms

Great, our request handler matched and we're ready to stream (no) messages to the client. However, when waiting a bit, our cURL process exits with an error (18) transfer closed with outstanding read data remaining. That's not good…

> time curl -v http://localhost:4000/sse
*   Trying 127.0.0.1:4000...
* Connected to localhost (127.0.0.1) port 4000 (#0)
> GET /sse HTTP/1.1
> Host: localhost:4000
> User-Agent: curl/8.1.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Cache-Control: no-cache
< Connection: keep-alive
< Content-Type: text/event-stream; charset=utf-8
< cache-control: max-age=0, private, must-revalidate
< date: Wed, 15 Nov 2023 15:54:11 GMT
< server: Cowboy
< transfer-encoding: chunked
< x-request-id: F5e4D9BN8eDi_fgAAAsh
<
* transfer closed with outstanding read data remaining
* Closing connection 0
curl: (18) transfer closed with outstanding read data remaining

________________________________________________________
Executed in   59.44 secs      fish           external
   usr time    6.82 millis  850.00 micros    5.97 millis
   sys time    9.33 millis  968.00 micros    8.36 millis

2. Implementing SSE Keep-Alive Messages

It's common that web-servers, reverse-proxies and other middle-boxes can terminate long-running and idle HTTP / TCP connections. Thus it is recommended to send SSE keep-alive messages every ~15 seconds or so, either unconditionally or when no data has been sent for some time. We can make use of Elixir's Process.send_after for this. We extend our request handler as follows:

defmodule MyAppWeb.MySSEController do
  use MyAppWeb, :controller

  @sse_keepalive_timeout 15000

  def sse_req(conn, _params) do
    conn =
      conn
    |> put_resp_header("Cache-Control", "no-cache")
    |> put_resp_header("Connection", "keep-alive")
    |> put_resp_header("Content-Type", "text/event-stream; charset=utf-8")
    |> send_chunked(200)

    timer_ref = Process.send_after(self(), :sse_keepalive, @sse_keepalive_timeout)

    sse_loop(conn, timer_ref)
  end

  defp sse_loop(conn, timer_ref) do
    receive do
      {:my_sse_message, msg} ->
        Process.cancel_timer timer_ref
        timer_ref = Process.send_after(self(), :sse_keepalive, @sse_keepalive_timeout)
        chunk(conn, "event: message\ndata: #{Jason.encode! msg}\n\n")
        sse_loop(conn, timer_ref)

      :sse_keepalive ->
        timer_ref = Process.send_after(self(), :sse_keepalive, @sse_keepalive_timeout)
        chunk(conn, ":keepalive\n\n")
        sse_loop(conn, timer_ref)

      _other ->
        # Ignore all other messages:
        sse_loop(conn, timer_ref)
    end
  end
end

In Server-Sent Events, every line starting with a colon character (:) is considered a comment and can thus be used to implement a keep-alive mechanism. Instead of sending :keepalive, it would also be sufficient to send just a colon. Thus we extended our request handler to

  1. arm a timer before entering the SSE loop,
  2. reset the timer every time a proper message is sent (Process.cancel_timer is safe to call even on already expired timers),
  3. and finally send a keep-alive message and re-arm the timer upon receiving a timeout message.

Unfortunately, we can see that this is not yet sufficient to stop Cowboy (Phoenix's underlying HTTP server) to not terminate our connection:

> time curl -v http://localhost:4000/sse
*   Trying 127.0.0.1:4000...
* Connected to localhost (127.0.0.1) port 4000 (#0)
> GET /sse HTTP/1.1
> Host: localhost:4000
> User-Agent: curl/8.1.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Cache-Control: no-cache
< Connection: keep-alive
< Content-Type: text/event-stream; charset=utf-8
< cache-control: max-age=0, private, must-revalidate
< date: Wed, 15 Nov 2023 16:02:57 GMT
< server: Cowboy
< transfer-encoding: chunked
< x-request-id: F5e4i4BVep9mxHQAABNk
<
:keepalive

:keepalive

:keepalive

* transfer closed with outstanding read data remaining
* Closing connection 0
curl: (18) transfer closed with outstanding read data remaining

________________________________________________________
Executed in   59.44 secs      fish           external
   usr time   12.65 millis    1.90 millis   10.75 millis
   sys time   10.75 millis    0.00 millis   10.75 millis

3. Disabling Cowboy's Idle Timeout for a Single Connection

Even with our keep-alive messages, Cowboy still terminates the connection after 60 seconds. This is because Cowboy determines a connection to be active solely by whether the server receives data from the client, and does not take into account any data sent by the server to the client. Naturally, the latter is the exact purpose of SSE, and existing client libraries will generally not send data to the server after they made their initial request.

The fact that Cowboy, Phoenix' underlying HTTP server, terminates idle connections, and its exact definition of what it means for a connection to be idle, seems to be a common source of confusion and frustration. Virtually all existing resources on this topic seem to suggest to just increase the global :idle_timeout option in the Plug.Cowboy configuration in your config.exs like so:

config :myapp, MyAppWeb.Endpoint,
  http: [
    port: 4000,
    protocol_options: [
      # Choose a sensible timeout, or set to :infinity to never kill
      # idle connections:
      idle_timeout: 300_000
    ]
  ],

However, this seems less than ideal: presumably, terminating long-standing idle connections can help reduce resource-consumption and prevent unbounded resource-leakage on the server, and thus we'd want Cowboy to terminate most of our connections! We want to make an exception just for a couple long-running connections, and better yet, only after authenticating clients.

Looking at the Cowboy HTTP module documentation, we can see that the :idle_timeout configuration be changed "using the set_options stream handler command". A promising example for this can be found in the cowboy_req:cast function documentation:

cowboy_req:cast({set_options, #{
  idle_timeout => 3600000
}}, Req).

Neat! This means that, as long as we can get a hold of the underlying Cowboy Req object somehow, we should be able to change this idle timeout on a per-connection granularity, dynamically within the request handler. Phoenix provides us only the conn and params parameters, so let's inspect the more promising conn first, by writing IO.inspect(conn) in our request handler:

%Plug.Conn{
  adapter: {Plug.Cowboy.Conn, :...},
  assigns: %{},
  body_params: %{},
  [...]

Okay, conn.adapter looks interesting. Let's inspect that tuple specifically:

{Plug.Cowboy.Conn,
 %{
   bindings: %{},
   body_length: 0,
   cert: :undefined,
   has_body: false,
   headers: %{

It looks like conn.adapter contains a tuple of {Plug.Cowboy.Conn, cowboy_req}, where cowboy_req is the Cowboy Req object! We can try to issue the cowboy_req:cast call from the Cowboy documentation, by passing a request timeout of :infinity:

def sse_req(conn, _params) do
  {Plug.Cowboy.Conn, cowboy_req} = conn.adapter
  :cowboy_req.cast({:set_options, %{ idle_timeout: :infinity }}, cowboy_req)

And re-running our cURL request now gives us:

> time curl -v http://localhost:4000/sse
*   Trying 127.0.0.1:4000...
* Connected to localhost (127.0.0.1) port 4000 (#0)
> GET /sse HTTP/1.1
> Host: localhost:4000
> User-Agent: curl/8.1.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Cache-Control: no-cache
< Connection: keep-alive
< Content-Type: text/event-stream; charset=utf-8
< cache-control: max-age=0, private, must-revalidate
< date: Wed, 15 Nov 2023 16:31:10 GMT
< server: Cowboy
< transfer-encoding: chunked
< x-request-id: F5fY5wXDXoT_TToAAABB
<
:keepalive

:keepalive

:keepalive

:keepalive

:keepalive

:keepalive

[...]

Sweet! With this, we can control Cowboy's idle timeout for each connection. Furthermore, by moving the :cowboy_req:cast invocation after authentication checks have occurred, we can still subject all unauthenticated request to the default global idle timeout.

As far as I know, the Plug library is designed to be usable with different HTTP server implementations, and Cowboy is just one of them. Supposedly, this means that the {Plug.Cowboy.Conn, cowboy_req} = conn.adapter match may raise an exception, if a different HTTP server is used. For my purposes, I deem this acceptable – this endpoint has not been tested with any other HTTP server, and should thus fail when not using Cowboy. Depending on your constraints, you may instead choose to put the timeout-setting logic behind a case match guard to more gracefully handle this case.