Overview
One of the software development patterns I really enjoy, and, think adds a ton of flexibility to your application is implementing a message bus or event bus. The idea is, you instrument your application with “events” that have a data payload and are published to whatever modules are subscribed. The main application doesn’t care about the return value of the events, just that they are sent. Supporting modules are then subscribed to any event they need to process, multiple modules can be subscribed to the same event and do different things.
For our application we are going to use octobus/event_bus.
The first thing we need to do is add it to our mix file.
# project/server/src/mix.exs
defmodule GetSocial.MixProject do
# ...
def application do
[
# ...
extra_applications: [
# ...
:event_bus
]
]
end
# ...
defp deps do
[
# ...
{:event_bus, "~> 1.6"}
]
end
# ...
end
Then we need to download the dependencies docker-compose run --rm server mix deps.get
.
EventBus
With EventBus installed we can do a bit of experimentation. First, let’s create a test case…
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
# Basic genserver subscriber implementation
defmodule TestSubscriber do
use GenServer, restart: :permanent
@topics [".*"]
alias EventBus, as: Events
def start_link(_),
do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
# don't do any wotk in the init, use :continue
def init(_),
do: {:ok, nil, {:continue, :subscribe}}
def handle_continue(:subscribe, _),
do: {:noreply, Events.subscribe({ __MODULE__, @topics })}
def process({ _topic, _id } = event),
do: GenServer.cast(__MODULE__, event)
def handle_cast({ topic, id } = event, _) do
event = event
|> Events.fetch_event()
{__MODULE__, topic, id}
|> Events.mark_as_completed()
send(:test_process, {:complete, topic, id, event})
{:noreply, nil}
end
end
# Start Tests
use ExUnit.Case
# Alias EventBus to Events to make it easier to swap in our wrapper module.
alias EventBus, as: Events
test "event_bus" do
# Because events are async, we need to register our current process for assert_recieve
Process.register(self(), :test_process)
# start tracing so we can assert agains the event_bus :notify event
event_notifier_pid = Process.whereis EventBus.Manager.Notification
:erlang.trace(event_notifier_pid, true, [:receive])
# Register our event
:ok = Events.register_topic(:test_user_event)
# Start our generic subscriber GenServer
start_supervised!({ TestSubscriber, []})
# wait for subscribers to subscribe
Process.sleep(100)
# assert our subscriber is subscribed
assert Events.subscribers()
|> Enum.member?({ TestSubscriber, [".*"] })
# Build Event Object
event = %EventBus.Model.Event{
id: Ecto.UUID.generate(),
topic: :test_user_event,
data: %{ email: "[email protected]" }
}
# Trigger Event
:ok = event |> Events.notify();
# assert on the :trace
assert_receive {
:trace,
^event_notifier_pid,
:receive,
{
:"$gen_cast",
{
:notify,
%{
id: id,
topic: topic
}
}
}
}
# test that our subscriber processed the event
assert_receive { :complete, ^topic, ^id, ^event }
end
end
In our test case we are creating a basic subscriber that we can use to handle test events. We’re testing the whole flow from registering and event, subscribing, and processing the event. What we really want to to is test the sending
independently of the receiving
. As our test suite grows, we don’t want to actually trigger the subscribers when we test that an event is raised.
To accomplish this we need to build an abstraction around EventBus
that will not call EventBus#notify
when :test = Mix.env()
.
EventBus Wrapper
Let’s start by creating our Events
module that simply delegates all its methods to EventBus
.
# project/server/src/lib/get_social/events.ex
defmodule GetSocial.Events do
defdelegate fetch_event(event),
to: EventBus
defdelegate mark_as_completed(event),
to: EventBus
defdelegate notify(event),
to: EventBus
defdelegate register_topic(topic),
to: EventBus
defdelegate subscribe(topic),
to: EventBus
defdelegate subscribers(),
to: EventBus
end
Now we can find all alias EventBus, as: Events
and replace them with alias GetSocial.Events
. Our tests should still pass, so, let’s work on replacing the tracing logic with an assert_recieve
.
We will be removing all of this:
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
# ...
test "event_bus" do
# ...
# start tracing so we can assert agains the event_bus :notify event
event_notifier_pid = Process.whereis EventBus.Manager.Notification
:erlang.trace(event_notifier_pid, true, [:receive])
# ...
# assert on the :trace
assert_receive {
:trace,
^event_notifier_pid,
:receive,
{
:"$gen_cast",
{
:notify,
%{
id: id,
topic: topic
}
}
}
}
# ...
end
end
Ideally we would be able to test that our event bus’ :notify
was triggered without having to start a trace.
Let’s first update our tests to use assert_recieve {:notify, ^event}
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
# ...
test "event_bus" do
Process.register(self(), :test_process)
# Register Event
:ok = Events.register_topic(:test_user_event)
# Start Subscriber GenServer
start_supervised!({ TestSubscriber, []})
# wait for subscribers to subscribe
Process.sleep(100)
# assert our subscriber is subscribed
assert Events.subscribers()
|> Enum.member?({ TestSubscriber, [".*"] })
# Build Event Object
event = %EventBus.Model.Event{
id: Ecto.UUID.generate(),
topic: :test_user_event,
data: %{ email: "[email protected]" }
}
# Trigger Event
:ok = event |> Events.notify();
# assert on the :trace
assert_receive { :notify, %EventBus.Model.Event{ topic: topic, id: id } }
# test that our subscriber processed the event
assert_receive { :complete, ^topic, ^id, ^event }
end
end
Our tests should now fail with a No message matching {:notify, %EventBus.Model.Event{topic: topic, id: id}}
as expected. We need to update our GetSocial.Events#notify
method so it sends a message to our process.
# project/server/src/lib/get_social/events.ex
defmodule GetSocial.Events do
# ...
def notify(event) do
with :test <- Mix.env() do
instrument(:notify, event)
else
_ -> event |> notify!()
end
:ok
end
defdelegate notify!(event),
to: EventBus,
as: :notify
# ...
defp instrument(action, params) do
test_process() |> send({ action, params })
end
defp test_process do
:test_process
end
end
We’ve created a new :notify
method and renamed our delegated method to :notify!
. This way we can test our event trigger and event handlers independently. When we call GetSocial.Events.notify
from our application code it will now send a :notify
message to our process when testing and call :notify!
when running normally.
To test our subscribers, we can now use GetSocial.Events.notify!
in our tests to trigger the subscribers.
Let’s fix our tests by using GetSocial.Events.notify!
.
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
# ...
test "event_bus" do
# ...
# force notify trigger
:ok = event |> Events.notify!()
# test that our subscriber processed the event
assert_receive { :complete, ^topic, ^id, ^event }
end
end
The next thing I’d like to cleanup is how events are published. Currently we use notify(%EventBus.Model.Event{})
. Let’s simplify the call to something like publish(topic, data)
.
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
# ...
test "event_bus" do
# ...
# Trigger Event
:ok = Events.publish(:test_user_event, %{ email: "[email protected]" })
# assert on the :trace
assert_receive { :notify, %EventBus.Model.Event{ topic: topic, id: id } = event }
# ...
end
end
With our failing tests, we can update our implementation.
# project/server/src/lib/get_social/events.ex
defmodule GetSocial.Events do
# ...
def publish(topic, data) do
%EventBus.Model.Event{
id: Ecto.UUID.generate(),
topic: topic,
data: data
}
|> notify()
end
# ...
end
Just like our :notify
method, we need to refactor our mark_as_complete
method to it sends a message to our test process too. The difference with our mark_as_completed
method is that we want it to send a message and call the original method.
# project/server/src/lib/get_social/events.ex
defmodule GetSocial.Events do
# ...
def mark_as_completed(event) do
with :test <- Mix.env() do
instrument(:complete, event)
end
event |> mark_as_completed!()
end
defdelegate mark_as_completed!(event),
to: EventBus,
as: :mark_as_completed
# ...
end
Subscribers
The last little bit to do is refactor our subscriber logic into something that can be reused. Let’s move all of the GenServer/EventBus boiler plate into a new module that we can use
in our subscribers. Let’s first update our tests, we’re going to replace the GenServer and EventBus methods with a single handle_event(topic, data)
method.
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
defmodule TestSubscriber do
use GetSocial.EventSubscriber,
topics: [".*"]
def handle_event(_topic, _data), do: :ok
end
# ...
end
With our subscriber code slimmed down and our tests failing, we can move on to our implementation.
# project/server/src/lib/get_social/event_subscriber.ex
defmodule GetSocial.EventSubscriber do
defmacro __using__(topics: topics) do
quote do
use GenServer, restart: :permanent
alias GetSocial.{Events,EventSubscriber}
def start_link(_),
do: GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
def init(_),
do: {:ok, nil, {:continue, :subscribe}}
def process({ _topic, _id } = event),
do: GenServer.cast(__MODULE__, event)
def handle_continue(:subscribe, _) do
with :ok <- Events.subscribe({ __MODULE__, unquote(topics) }) do
{ :noreply, nil }
end
end
def handle_continue({:mark_as_completed, {topic, id}}, _) do
with :ok <- Events.mark_as_completed({__MODULE__, topic, id}) do
{ :noreply, nil }
end
end
def handle_cast({ topic, id }, _state) do
with event <- Events.fetch_event({ topic, id }),
%{ data: data } <- event,
:ok <- handle_event(topic, data) do
{:noreply, nil, {:continue, {:mark_as_completed, {topic, id}}}}
end
end
def handle_event(topic,_), do: {:error, topic}
defoverridable handle_event: 2
end
end
end
The biggest change between our previous implementation is the handle_cast
method. Our new method looks up the event detail and delegates to a new handle_event(topic, data)
method which our subscribers will implement to process the events. The other change is moving the Events.mark_as_completed
call into a handle_continue
method that is triggered by returning {:noreply, nil, {:continue, {:mark_as_completed, { topic, id, event }}}}
from our handle_cast
method.
Registering Subscribers
Now that we can create subscribers, we need to register them. Let’s use a new subscriber for demonstration purposes which will send a slack message whenever a user makes a post in our application.
First, we will create our subscriber:
defmodule GetSocial.Events.SlackSubscriber do
use GetSocial.EventSubscriber,
topics: ["user_post_complete"]
def handle_event(topic, data) do
{topic, data} |> IO.inspect
:ok
end
end
With our subscriber, we need to add it to our application.ex
:
# project/server/src/lib/get_social/application.ex
defmodule GetSocial.Application do
# ...
def start(_type, _args) do
children = [
# ...
GetSocial.Events.SlackSubscriber
]
# ...
end
# ...
end
Last, but not least, we need to register our event topics in our config.exs
# project/server/src/config/config.exs
# ...
config :event_bus,
topics: [
:user_post_complete
]
# ...
With out new event registered and subscriber listening, we just need to update our application code to publish the event:
GetSociel.Events.publish(:user_post_complete, %{ id: "1234" })
EventCase
Now that we have our application logic dialed in, we can spend some time and refactor our test into a new EventCase
module that can be used like our ApiCase
.
This new EventCase
will handle registering the test process and also give us a new assert_subscribed
test helper.
Let’s start with updating our tests:
# project/server/src/test/get_social/events_test.exs
defmodule GetSocial.EventsTest do
# ...
use GetSocial.EventCase, otp_app: :get_social
# ...
test "event_bus" do
# REMOVE
# Process.register(self(), :test_process)
# ...
# REPLACE
# assert Events.subscribers()
# |> Enum.member?({ TestSubscriber, [".*"] })
# WITH
assert_subscribed {TestSubscriber, [".*"]}
# ...
end
end
With our tests updated, we can add our new EventCase
module.
# project/server/src/test/support/event_case.ex
defmodule GetSocial.EventCase do
use ExUnit.CaseTemplate
using(otp_app: otp_app) do
quote do
setup do
Application.put_env(unquote(otp_app), :shared_test_process, self())
:ok
end
def assert_subscribed({ module, topics }) do
assert GetSocial.Events.subscribers()
|> Enum.member?({ module, topics })
end
end
end
end
We now need to go update our GetSocial.Events
so it will retrieve the test process from our application env instead of hardcoding the process name.
# project/server/src/lib/get_social/events.ex
defmodule GetSocial.Events do
# ...
defp test_process do
Application.get_env(:get_social, :shared_test_process) || self()
end
end
With that, our tests should pass again and we can now reuse our use GetSocial.EventCase
anywhere we need to test triggering events.
EventBus Dashboard
We’re also going to add the dashboard module that gives us a nice web interface to check on our event processing.
# project/server/src/mix.exs
defmodule GetSocial.MixProject do
# ...
def application do
[
# ...
extra_applications: [
# ...
:event_bus_metrics
]
]
end
# ...
defp deps do
[
# ...
{:event_bus_metrics, "~> 0.3"}
]
end
# ...
end
And configure it…
# project/server/src/config/config.exs
config :event_bus_metrics,
cross_origin: {:system, "EB_CROSS_ORIGIN", "off"},
http_server: {:system, "EB_HTTP_SERVER", "off"},
http_server_port: {:system, "PORT", "4000"},
# Server-Sent-Events Tickers:
notify_subscriber_metrics_in_ms: {:system, "EB_SUBSCRIBER_M_IN_MS", 250},
notify_topic_metrics_in_ms: {:system, "EB_TOPIC_M_IN_MS", 1000},
notify_topics_metrics_in_ms: {:system, "EB_TOPICS_M_IN_MS", 250}
Now we can wire up the dashboard endpoint, but, we only want to make this available when in development.
# project/server/src/lib/get_social_web/router.ex
defmodule GetSocialWeb.Router do
# ...
if Mix.env == :dev do
forward "/events", EventBus.Metrics.Web.Router
end
# ...
end
After running docker-compose run --rm server mix deps.get
and restarting our server we can visit our dashboard localhost:4000/events/ui/#/topics.
References
- Writing extensible Elixir with Behaviours
- Decoupled Modules with Elixir EventBus
- Implementing a flexible notifications system in Elixir using Protocols
- the-not-so-magic-tricks-of-testing-in-elixir-1-2
- mocks-and-explicit-contracts/
- getting-started/mix-otp/genserver.html#testing-a-genserver
- get-notified-of-user-signups-and-plan-changes-automatically-using-postgres-phoenix-pubsub-e67d061b04bc