MessageBus / EventBus

12/25/2018

Foundations
Test Suite Setup

Overview

One of the software development patterns I really enjoy, and, think adds a ton of flexibility to your application is implementing a message bus or event bus. The idea is, you instrument your application with “events” that have a data payload and are published to whatever modules are subscribed. The main application doesn’t care about the return value of the events, just that they are sent. Supporting modules are then subscribed to any event they need to process, multiple modules can be subscribed to the same event and do different things.

For our application we are going to use octobus/event_bus.

The first thing we need to do is add it to our mix file.

# project/server/src/mix.exs

defmodule GetSocial.MixProject do
  # ...

  def application do
    [
      # ...
      extra_applications: [
        # ...
        :event_bus
      ]
    ]
  end

  # ...

  defp deps do
    [
      # ...
      {:event_bus, "~> 1.6"}
    ]
  end

  # ...
end

Then we need to download the dependencies docker-compose run --rm server mix deps.get.

EventBus

With EventBus installed we can do a bit of experimentation. First, let’s create a test case…

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do

  # Basic genserver subscriber implementation
  defmodule TestSubscriber do
    use GenServer, restart: :permanent

    @topics [".*"]

    alias EventBus, as: Events

    def start_link(_),
      do: GenServer.start_link(__MODULE__, [], name: __MODULE__)

    # don't do any wotk in the init, use :continue
    def init(_),
      do: {:ok, nil, {:continue, :subscribe}}

    def handle_continue(:subscribe, _),
      do: {:noreply, Events.subscribe({ __MODULE__, @topics })}

    def process({ _topic, _id } = event),
      do: GenServer.cast(__MODULE__, event)

    def handle_cast({ topic, id } = event, _) do
      event = event
        |> Events.fetch_event()

      {__MODULE__, topic, id}
        |> Events.mark_as_completed()

      send(:test_process, {:complete, topic, id, event})

      {:noreply, nil}
    end
  end

  # Start Tests
  use ExUnit.Case

  # Alias EventBus to Events to make it easier to swap in our wrapper module.
  alias EventBus, as: Events

  test "event_bus" do
    # Because events are async, we need to register our current process for assert_recieve
    Process.register(self(), :test_process)

    # start tracing so we can assert agains the event_bus :notify event
    event_notifier_pid = Process.whereis EventBus.Manager.Notification
    :erlang.trace(event_notifier_pid, true, [:receive])

    # Register our event
    :ok = Events.register_topic(:test_user_event)

    # Start our generic subscriber GenServer
    start_supervised!({ TestSubscriber, []})

    # wait for subscribers to subscribe
    Process.sleep(100)

    # assert our subscriber is subscribed
    assert Events.subscribers()
          |> Enum.member?({ TestSubscriber, [".*"] })

    # Build Event Object
    event = %EventBus.Model.Event{
      id: Ecto.UUID.generate(),
      topic: :test_user_event,
      data: %{ email: "[email protected]" }
    }

    # Trigger Event
    :ok = event |> Events.notify();

    # assert on the :trace
    assert_receive {
      :trace,
      ^event_notifier_pid,
      :receive,
      {
        :"$gen_cast",
        {
          :notify,
          %{
            id: id,
            topic: topic
          }
        }
      }
    }

    # test that our subscriber processed the event
    assert_receive { :complete, ^topic, ^id, ^event }
  end
end

In our test case we are creating a basic subscriber that we can use to handle test events. We’re testing the whole flow from registering and event, subscribing, and processing the event. What we really want to to is test the sending independently of the receiving. As our test suite grows, we don’t want to actually trigger the subscribers when we test that an event is raised.

To accomplish this we need to build an abstraction around EventBus that will not call EventBus#notify when :test = Mix.env().

EventBus Wrapper

Let’s start by creating our Events module that simply delegates all its methods to EventBus.

# project/server/src/lib/get_social/events.ex

defmodule GetSocial.Events do

  defdelegate fetch_event(event),
    to: EventBus

  defdelegate mark_as_completed(event),
    to: EventBus

  defdelegate notify(event),
    to: EventBus

  defdelegate register_topic(topic),
    to: EventBus

  defdelegate subscribe(topic),
    to: EventBus

  defdelegate subscribers(),
    to: EventBus

end

Now we can find all alias EventBus, as: Events and replace them with alias GetSocial.Events. Our tests should still pass, so, let’s work on replacing the tracing logic with an assert_recieve.

We will be removing all of this:

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do
  # ...

  test "event_bus" do
    # ...

    # start tracing so we can assert agains the event_bus :notify event
    event_notifier_pid = Process.whereis EventBus.Manager.Notification
    :erlang.trace(event_notifier_pid, true, [:receive])

    # ...

    # assert on the :trace
    assert_receive {
      :trace,
      ^event_notifier_pid,
      :receive,
      {
        :"$gen_cast",
        {
          :notify,
          %{
            id: id,
            topic: topic
          }
        }
      }
    }

    # ...
  end
end

Ideally we would be able to test that our event bus’ :notify was triggered without having to start a trace.

Let’s first update our tests to use assert_recieve {:notify, ^event}

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do

  # ...

  test "event_bus" do
    Process.register(self(), :test_process)

    # Register Event
    :ok = Events.register_topic(:test_user_event)

    # Start Subscriber GenServer
    start_supervised!({ TestSubscriber, []})

    # wait for subscribers to subscribe
    Process.sleep(100)

    # assert our subscriber is subscribed
    assert Events.subscribers()
          |> Enum.member?({ TestSubscriber, [".*"] })

    # Build Event Object
    event = %EventBus.Model.Event{
      id: Ecto.UUID.generate(),
      topic: :test_user_event,
      data: %{ email: "[email protected]" }
    }

    # Trigger Event
    :ok = event |> Events.notify();

    # assert on the :trace
    assert_receive { :notify, %EventBus.Model.Event{ topic: topic, id: id } }

    # test that our subscriber processed the event
    assert_receive { :complete, ^topic, ^id, ^event }
  end
end

Our tests should now fail with a No message matching {:notify, %EventBus.Model.Event{topic: topic, id: id}} as expected. We need to update our GetSocial.Events#notify method so it sends a message to our process.

# project/server/src/lib/get_social/events.ex

defmodule GetSocial.Events do
  # ...

  def notify(event) do
    with :test <- Mix.env() do
      instrument(:notify, event)
    else
      _ -> event |> notify!()
    end

    :ok
  end

  defdelegate notify!(event),
    to: EventBus,
    as: :notify

  # ...

  defp instrument(action, params) do
    test_process() |> send({ action, params })
  end

  defp test_process do
    :test_process
  end
end

We’ve created a new :notify method and renamed our delegated method to :notify!. This way we can test our event trigger and event handlers independently. When we call GetSocial.Events.notify from our application code it will now send a :notify message to our process when testing and call :notify! when running normally.

To test our subscribers, we can now use GetSocial.Events.notify! in our tests to trigger the subscribers.

Let’s fix our tests by using GetSocial.Events.notify!.

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do

  # ...

  test "event_bus" do
    # ...

    # force notify trigger
    :ok = event |> Events.notify!()

    # test that our subscriber processed the event
    assert_receive { :complete, ^topic, ^id, ^event }
  end
end

The next thing I’d like to cleanup is how events are published. Currently we use notify(%EventBus.Model.Event{}). Let’s simplify the call to something like publish(topic, data).

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do
  # ...

  test "event_bus" do
    # ...

    # Trigger Event
    :ok = Events.publish(:test_user_event, %{ email: "[email protected]" })

    # assert on the :trace
    assert_receive { :notify, %EventBus.Model.Event{ topic: topic, id: id } = event }

    # ...
  end
end

With our failing tests, we can update our implementation.

# project/server/src/lib/get_social/events.ex

defmodule GetSocial.Events do

  # ...

  def publish(topic, data) do
    %EventBus.Model.Event{
      id: Ecto.UUID.generate(),
      topic: topic,
      data: data
    }
    |> notify()
  end

  # ...
end

Just like our :notify method, we need to refactor our mark_as_complete method to it sends a message to our test process too. The difference with our mark_as_completed method is that we want it to send a message and call the original method.

# project/server/src/lib/get_social/events.ex

defmodule GetSocial.Events do
  # ...

  def mark_as_completed(event) do
    with :test <- Mix.env() do
      instrument(:complete, event)
    end

    event |> mark_as_completed!()
  end

  defdelegate mark_as_completed!(event),
    to: EventBus,
    as: :mark_as_completed

  # ...
end

Subscribers

The last little bit to do is refactor our subscriber logic into something that can be reused. Let’s move all of the GenServer/EventBus boiler plate into a new module that we can use in our subscribers. Let’s first update our tests, we’re going to replace the GenServer and EventBus methods with a single handle_event(topic, data) method.

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do

  defmodule TestSubscriber do
    use GetSocial.EventSubscriber,
      topics: [".*"]

    def handle_event(_topic, _data), do: :ok
  end

  # ...
end

With our subscriber code slimmed down and our tests failing, we can move on to our implementation.

# project/server/src/lib/get_social/event_subscriber.ex

defmodule GetSocial.EventSubscriber do
  defmacro __using__(topics: topics) do
    quote do
      use GenServer, restart: :permanent

      alias GetSocial.{Events,EventSubscriber}

      def start_link(_),
        do: GenServer.start_link(__MODULE__, :ok, name: __MODULE__)

      def init(_),
        do: {:ok, nil, {:continue, :subscribe}}

      def process({ _topic, _id } = event),
        do: GenServer.cast(__MODULE__, event)

      def handle_continue(:subscribe, _) do
        with :ok <- Events.subscribe({ __MODULE__, unquote(topics) }) do
          { :noreply, nil }
        end
      end

      def handle_continue({:mark_as_completed, {topic, id}}, _) do
        with :ok <- Events.mark_as_completed({__MODULE__, topic, id}) do
          { :noreply, nil }
        end
      end

      def handle_cast({ topic, id }, _state) do
        with event <- Events.fetch_event({ topic, id }),
            %{ data: data } <- event,
            :ok <- handle_event(topic, data) do

          {:noreply, nil, {:continue, {:mark_as_completed, {topic, id}}}}
        end
      end

      def handle_event(topic,_), do: {:error, topic}
      defoverridable handle_event: 2
    end
  end
end

The biggest change between our previous implementation is the handle_cast method. Our new method looks up the event detail and delegates to a new handle_event(topic, data) method which our subscribers will implement to process the events. The other change is moving the Events.mark_as_completed call into a handle_continue method that is triggered by returning {:noreply, nil, {:continue, {:mark_as_completed, { topic, id, event }}}} from our handle_cast method.

Registering Subscribers

Now that we can create subscribers, we need to register them. Let’s use a new subscriber for demonstration purposes which will send a slack message whenever a user makes a post in our application.

First, we will create our subscriber:

defmodule GetSocial.Events.SlackSubscriber do

  use GetSocial.EventSubscriber,
    topics: ["user_post_complete"]

  def handle_event(topic, data) do
    {topic, data} |> IO.inspect
    :ok
  end

end

With our subscriber, we need to add it to our application.ex:

# project/server/src/lib/get_social/application.ex

defmodule GetSocial.Application do
  # ...

  def start(_type, _args) do

    children = [
      # ...
      GetSocial.Events.SlackSubscriber
    ]

    # ...
  end

  # ...
end

Last, but not least, we need to register our event topics in our config.exs

# project/server/src/config/config.exs

# ...

config :event_bus,
  topics: [
    :user_post_complete
  ]

# ...

With out new event registered and subscriber listening, we just need to update our application code to publish the event:

GetSociel.Events.publish(:user_post_complete, %{ id: "1234" })

EventCase

Now that we have our application logic dialed in, we can spend some time and refactor our test into a new EventCase module that can be used like our ApiCase.

This new EventCase will handle registering the test process and also give us a new assert_subscribed test helper.

Let’s start with updating our tests:

# project/server/src/test/get_social/events_test.exs

defmodule GetSocial.EventsTest do
  # ...

  use GetSocial.EventCase, otp_app: :get_social

  # ...

  test "event_bus" do
    # REMOVE
    # Process.register(self(), :test_process)

    # ...

    # REPLACE
    # assert Events.subscribers()
    #       |> Enum.member?({ TestSubscriber, [".*"] })
    # WITH
    assert_subscribed {TestSubscriber, [".*"]}

    # ...
  end
end

With our tests updated, we can add our new EventCase module.

# project/server/src/test/support/event_case.ex

defmodule GetSocial.EventCase do
  use ExUnit.CaseTemplate

   using(otp_app: otp_app) do
    quote do

      setup do
        Application.put_env(unquote(otp_app), :shared_test_process, self())
        :ok
      end

      def assert_subscribed({ module, topics }) do
        assert GetSocial.Events.subscribers()
              |> Enum.member?({ module, topics })
      end

    end
  end
end

We now need to go update our GetSocial.Events so it will retrieve the test process from our application env instead of hardcoding the process name.

# project/server/src/lib/get_social/events.ex

defmodule GetSocial.Events do
  # ...

  defp test_process do
    Application.get_env(:get_social, :shared_test_process) || self()
  end
end

With that, our tests should pass again and we can now reuse our use GetSocial.EventCase anywhere we need to test triggering events.

EventBus Dashboard

We’re also going to add the dashboard module that gives us a nice web interface to check on our event processing.

# project/server/src/mix.exs

defmodule GetSocial.MixProject do
  # ...

  def application do
    [
      # ...
      extra_applications: [
        # ...
        :event_bus_metrics
      ]
    ]
  end

  # ...

  defp deps do
    [
      # ...
      {:event_bus_metrics, "~> 0.3"}
    ]
  end

  # ...
end

And configure it…

# project/server/src/config/config.exs

config :event_bus_metrics,
  cross_origin: {:system, "EB_CROSS_ORIGIN", "off"},
  http_server: {:system, "EB_HTTP_SERVER", "off"},
  http_server_port: {:system, "PORT", "4000"},
  # Server-Sent-Events Tickers:
  notify_subscriber_metrics_in_ms: {:system, "EB_SUBSCRIBER_M_IN_MS", 250},
  notify_topic_metrics_in_ms: {:system, "EB_TOPIC_M_IN_MS", 1000},
  notify_topics_metrics_in_ms: {:system, "EB_TOPICS_M_IN_MS", 250}

Now we can wire up the dashboard endpoint, but, we only want to make this available when in development.

# project/server/src/lib/get_social_web/router.ex

defmodule GetSocialWeb.Router do
  # ...

  if Mix.env == :dev do
    forward "/events", EventBus.Metrics.Web.Router
  end

  # ...
end

After running docker-compose run --rm server mix deps.get and restarting our server we can visit our dashboard localhost:4000/events/ui/#/topics.

References

Next Level

© 2020 ThinkAddict.com. All rights reserved.