Streaming Remote RSS in Elixir
Build a better RSS reader in Elixir: stream feeds efficiently and process episodes your way 📡
Introduction
Recently I was working with RSS feeds on a project and realized that Elixir doesn't have great tooling to consume RSS feeds. The problems I was facing were that I wanted the ability to perform a task on each episode of an RSS stream, where each task is independent of the tasks for other episodes. Here are some things I wanted:
- Support for reading the RSS metadata before beginning any episode-related task
- Have each item be handled independent of others
- Stream the RSS feed from a URL
- NOT require that I parse the entire RSS
- Support early stopping after X number of episodes
So far, I'm happy with the solution I landed on. The library is published as a Hex package and is available on GitHub for your reference.
Gathering Supplies
Now, RSS is a specification built on XML, so even if there is no dedicated RSS reading library you can reach for existing XML parsers. XML parsers tend to come in two flavors. The simple approach is to parse the entire XML and output a structure such as a map. These tend to be more straight-forward and easy to use). Then there is SAX-based parsing, which uses call-back driven parsing. These tend to be more unwieldy but faster, and more memory efficient since they don't always involve reading the entire XML document.
Now I identified I would be using a SAX parser (the Saxy library in particular), which would allow the streaming and early stopping capabilities. Next, I had to determine how to still get those capabilities when fetching the RSS from a remote URL. This is where I decided to go with the Req HTTP client due to my familiarity with it plus its awesome support for partial reading! With Req, you can use the :into
option to specify a function to read streamed chunks.
It was at this point that I realized that there was a slight problem with my plan to use Saxy.parse_stream/4
, which was that it expects its input to be a single Enumerable.t()
, which is not what we work with when streaming the partial chunks. Luckily, Saxy already had me covered with the Saxy.Partial
module, which fits my exact use case:
"Supports parsing an XML document partially. This module is useful when the XML document cannot be turned into a Stream
e.g over sockets."
Cool, now I have all the tools I need, and I just now have to put them together.
Writing the Saxy Handler
First, let's implement our Saxy.Handler
, which is similar to a GenServer
and handles specific callbacks emitted by Saxy
at different points when XML tags and elements are being read:
defmodule Reed.Handler do
@moduledoc false
@behaviour Saxy.Handler
defmodule State do
defstruct feed_info: %{},
current_item: nil,
current_text: "",
current_path: [],
transform: nil,
halted: false,
private: %{}
end
@client_keys [:feed_info, :current_item, :halted, :private]
def client_state(state), do: Map.take(state, @client_keys)
@impl Saxy.Handler
def handle_event(:start_document, _prolog, state), do: {:ok, state}
@impl Saxy.Handler
def handle_event(:end_document, _data, state), do: {:ok, state}
@impl Saxy.Handler
def handle_event(:start_element, {name, attributes}, state) do
current_path = [name | state.current_path]
new_state =
cond do
name == "item" ->
%{
state
| current_item: %{},
current_path: current_path
}
not is_nil(state.current_item) ->
current_item =
if attributes != [] do
put_in(
state.current_item,
current_path
|> item_path()
|> access(),
Map.new(attributes)
)
else
state.current_item
end
%{
state
| current_path: current_path,
current_item: current_item
}
true ->
feed_info =
if attributes != [] do
put_in(
state.feed_info,
current_path
|> feed_path()
|> access(),
Map.new(attributes)
)
else
state.feed_info
end
%{
state
| current_path: current_path,
feed_info: feed_info
}
end
{:ok, new_state}
end
@impl Saxy.Handler
def handle_event(:end_element, name, state) do
[_current | parent_path] = state.current_path
new_state =
cond do
not is_nil(state.current_item) and name == "item" ->
client_state =
state
|> client_state()
|> state.transform.()
state = Map.merge(state, client_state)
%{
state
| current_item: nil,
current_text: "",
current_path: parent_path
}
not is_nil(state.current_item) ->
local_path =
state.current_path |> item_path()
value =
get_in(state.current_item, local_path) ||
String.trim(state.current_text)
%{
state
| current_item:
put_in(
state.current_item,
local_path
|> access(),
value
),
current_text: "",
current_path: parent_path
}
true ->
value =
get_in(state.feed_info, state.current_path |> feed_path()) ||
String.trim(state.current_text)
%{
state
| feed_info:
put_in(
state.feed_info,
state.current_path |> feed_path() |> access(),
value
),
current_text: "",
current_path: parent_path
}
end
if state.halted, do: {:stop, new_state}, else: {:ok, new_state}
end
@impl Saxy.Handler
def handle_event(:characters, chars, state) do
{:ok, %{state | current_text: state.current_text <> chars}}
end
defp item_path(path) do
path
|> Enum.split_while(&(&1 != "item"))
|> elem(0)
|> Enum.reverse()
end
defp feed_path(path) do
path |> Enum.reverse()
end
defp access(path) do
path |> Enum.map(&Access.key(&1, %{}))
end
end
The %State{}
struct holds our state (as the name implies), which includes the :feed_info
(the RSS stream metadata), and then information about the current item. Since this is a streaming approach, we only store the current item (episode). You can also see that we have this concept of transforms
, which we will talk about later, but are basically state transformation function passed by the user that are applied to the state for each item. So this streams and builds up an episode into a map, and when an item is finished then it will apply the transform to the item and then move onto the next item. Deferring to the transform means that the user gets to decide exactly what to do with each item in the RSS feed, and by default only the :feed_info
is kept. So this approach is an opt-in approach. This design is nice, since what you might want to do with each episode might involve sending it to another process to perform a task rather than accumulating it.
Req Plugin
This Saxy.Handler
can actually be used independently of Req
, meaning if you have an RSS feed stored on a file or another way to read it then you can still use this handler. As such, I wrote the parts that use Req as a Req plugin, which allowed Req to be an optional dependency for the library.
if Code.ensure_loaded?(Req) || Mix.env() == :docs do
defmodule Reed.ReqPlugin do
alias Req.{Request, Response}
def attach(%Req.Request{} = request, options \\ []) do
request
|> Request.register_options([:transform])
|> Request.merge_options(options)
|> Request.prepend_request_steps(setup_rss_stream: &setup_rss_stream/1)
end
def setup_rss_stream(request) do
item_handler = Map.get(request.options, :transform, & &1)
{:ok, partial} =
Saxy.Partial.new(Reed.Handler, %Reed.Handler.State{
transform: item_handler
})
request
|> Request.put_private(:partial, partial)
|> Map.put(
:into,
fn {:data, chunk}, {req, resp} ->
partial = Request.get_private(req, :partial)
try do
case Saxy.Partial.parse(partial, chunk) do
{:cont, new_partial} ->
request = Request.put_private(req, :partial, new_partial)
client_state =
new_partial
|> Saxy.Partial.get_state()
|> Reed.Handler.client_state()
resp = Response.put_private(resp, :rss, client_state)
{:cont, {request, resp}}
{:halt, final_user_state} ->
request =
Request.update_private(
req,
:partial,
nil,
fn %{
state:
%{
user_state: %{}
} = state
} = partial ->
%{partial | state: %{state | user_state: final_user_state}}
end
)
resp =
Response.put_private(resp, :rss, Reed.Handler.client_state(final_user_state))
{:halt, {request, resp}}
{:error, reason} ->
raise reason
end
rescue
Saxy.ParseError ->
client_state =
partial
|> Saxy.Partial.get_state()
|> Reed.Handler.client_state()
resp = Response.put_private(resp, :rss, client_state)
{:halt, {request, resp}}
end
end
)
end
end
end
The standard API (by convention only) is to make Req plugins export an attach/2
function. This will take in the %Req.Request{}
struct along with any options, and return the modified request struct. Req has the construct of steps
for both the request and response lifecycles. The steps work by modifying the %Req.Request{}
and %Req.Response{}
structs, applying a transformation of some sort (much like the plugin). So after all of the request steps run, then the request is made using its adapter (Finch by default), then all of the response steps are run on the received response.
So in our plugin, we update the :into
option, which dictates how the actual request is run during the request step. In the function we provide, which will be used to parse chunks of partial data, we receive the data chunk along with the {%Req.Request{}, %Req.Response{}}
pair. We use the initially stored arguments in the %Req.Request{}
struct (which were put during the plugin's step) to setup the state, then we use the %Req.Response{}
struct to maintain state between chunks.
This aligns well with how Saxy.Partial
works as well. So for each chunk, we retrieve the last state of the partial
state from the response struct, we use that partial state to parse the next chunk, then put the new state back into the response struct so it's ready for the next chunk.
The Saxy.Partial.parse
function returns either {:cont, new_partial}
when we want to keep parsing, or {:halt, final_user_state}
when we want to stop. The transform
function provided by the user is what determines when we :halt
, so let's take a look at those now.
Transforms
Now that I had it setup to start parsing as a stream, I had to provide a way to control halting, accumulation, and overall how to handle each RSS item. The way I opted to do that is to provide a :private
field to the %State{}
struct. That is there for the user to modify. Then for each RSS item, we give the user the [:feed_info, :current_item, :halted, :private]
keys. So if they want the parsing halted after this item, then make the :halted
key true
. They have the :current_item
and :feed_info
at each step, meaning you have the overall stream metadata and the episode information available. Then you can do whatever you want with the :private
field, which will accumulate over the episodes.
In the Reed.Transformers
module, I provide some helpful transformations:
defmodule Reed.Transformers do
def transform_item(%{current_item: item} = state, transformer)
when is_function(transformer, 1) do
%{state | current_item: transformer.(item)}
end
def limit(%{private: private} = state, count) when is_integer(count) do
private =
private
|> Map.update(:count, 1, &(&1 + 1))
state = %{state | private: private}
if Map.fetch!(private, :count) >= count, do: halt(state), else: state
end
def collect(%{current_item: item, private: private} = state) do
%{state | private: Map.update(private, :items, [item], &[item | &1])}
end
def halt(%{} = state) do
%{state | halted: true}
end
end
Lastly, I provide a convenient macro that will format take your pipeline correctly
if you have a pipeline composed entirely of functions with the following
function signature: (state::state, opts::list() \\ [])
This is the function signature that all transformer functions withinReed.Transformers
have.
This macro should always be called last in a transformation pipeline.
defmacro transform(pipeline) do
quote do
fn state ->
state |> unquote(pipeline)
end
end
end
The following two are equivalent:
Reed.get!(url, transform:
fn state ->
state
|> stop_after(1)
|> collect()
end
)
Reed.get!(url, transform:
stop_after(1)
|> collect()
|> transform()
)
Putting it All Together
Now, given a remote rss_url
, you can do the following quite easily:
Get the feed metadata
import Reed.Transformers
Reed.get!(rss_url, transform: transform(halt()))
Get all items in a list
import Reed.Transformers
Reed.get!(rss_url, transform: transform(collect()))
Get the first 5 items in a list
import Reed.Transformers
Reed.get!(rss_url, transform: collect() |> limit(5) |> transform())
Get all itunes:
namespaced elements from the first 2 items as a list
import Reed.Transformers
Reed.get!(rss_url,
transform:
transform_item(
&Map.filter(&1, fn
{<<"itunes:", _rest::binary>>, _v} -> true
_ -> false
end)
)
|> collect()
|> limit(2)
|> transform()
)
That's it! I hope you found this helpful and if you have a need to consume RSS in Elixir, I hope you give Reed a try!