Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/rabbit/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ defmodule Rabbit.Consumer do
| {:arguments, Keyword.t()}
| {:custom_meta, map()}
| {:setup_opts, setup_options()}
| {:sync_start, boolean()}
| {:sync_start_delay, non_neg_integer()}
| {:sync_start_max, non_neg_integer()}
@type options :: [option()]
@type delivery_tag :: non_neg_integer()
@type action_options :: [{:multiple, boolean()} | {:requeue, boolean()}]
Expand Down Expand Up @@ -217,6 +220,12 @@ defmodule Rabbit.Consumer do
* `:custom_meta` - A map of custom data that will be included in each `Rabbit.Message`
handled by the consumer.
* `:setup_opts` - A keyword list of custom options for use in `c:handle_setup/1`.
* `:sync_start` - Boolean representing whether to establish the connection,
channel, and setup synchronously - defaults to `false`.
* `:sync_start_delay` - The amount of time in milliseconds to sleep between
sync start attempts - defaults to `50`.
* `:sync_start_max` - The max amount of sync start attempts that will occur
before proceeding with async start - defaults to `100`.

## Server Options

Expand Down
35 changes: 33 additions & 2 deletions lib/rabbit/consumer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ defmodule Rabbit.Consumer.Server do
arguments: [type: :list, default: []],
timeout: [type: [:integer, :atom], required: false],
custom_meta: [type: :map, default: %{}],
setup_opts: [type: :list, default: [], required: false]
setup_opts: [type: :list, default: [], required: false],
sync_start: [type: :boolean, required: true, default: false],
sync_start_delay: [type: :integer, required: true, default: 50],
sync_start_max: [type: :integer, required: true, default: 100]
}

@qos_opts [
Expand Down Expand Up @@ -61,6 +64,7 @@ defmodule Rabbit.Consumer.Server do
with {:ok, opts} <- module.init(:consumer, opts),
{:ok, opts} <- validate_opts(opts, @opts_schema) do
state = init_state(module, opts)
state = sync_start(state)
{:ok, state, {:continue, :connection}}
end
end
Expand Down Expand Up @@ -171,10 +175,37 @@ defmodule Rabbit.Consumer.Server do
consume_opts: Keyword.take(opts, @consume_opts),
worker_opts: Keyword.take(opts, @worker_opts),
custom_meta: Keyword.get(opts, :custom_meta),
setup_opts: Keyword.get(opts, :setup_opts)
setup_opts: Keyword.get(opts, :setup_opts),
sync_start: Keyword.get(opts, :sync_start),
sync_start_delay: Keyword.get(opts, :sync_start_delay),
sync_start_max: Keyword.get(opts, :sync_start_max),
started_mode: :async
}
end

defp sync_start(state, attempt \\ 1)

defp sync_start(%{sync_start: false} = state, _attempt), do: state

defp sync_start(%{sync_start_max: max} = state, attempt) when attempt >= max do
log_error(state, {:error, :sync_start_failed})
state
end

defp sync_start(state, attempt) do
with {:ok, state} <- connection(state),
{:ok, connection} <- Rabbit.Connection.fetch(state.connection),
{:ok, state} <- channel(state, connection),
{:ok, state} <- handle_setup(state),
{:ok, state} <- consume(state) do
%{state | started_mode: :sync}
else
_ ->
:timer.sleep(state.sync_start_delay)
sync_start(state, attempt + 1)
end
end

defp connection(%{connection_subscribed: true} = state), do: {:ok, state}

defp connection(state) do
Expand Down
72 changes: 67 additions & 5 deletions test/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,36 @@ defmodule Rabbit.ConsumerTest do
end
end

defmodule TroublesomeTestConsumer do
use Rabbit.Consumer

@impl Rabbit.Consumer
def init(:consumer, opts) do
{:ok, opts}
end

@impl Rabbit.Consumer
def handle_setup(state) do
attempt = Agent.get_and_update(state.setup_opts[:counter], fn n -> {n, n + 1} end)

if attempt == 0 do
{:error, :something_went_wrong}
else
AMQP.Queue.declare(state.channel, state.queue, auto_delete: true)
:ok
end
end

@impl Rabbit.Consumer
def handle_message(_msg) do
end

@impl Rabbit.Consumer
def handle_error(_) do
:ok
end
end

setup do
{:ok, connection} = Connection.start_link(TestConnection)
{:ok, producer} = Producer.start_link(TestProducer, connection: connection)
Expand All @@ -118,6 +148,34 @@ defmodule Rabbit.ConsumerTest do
end
end

describe "start_link/3 with :sync_start" do
test "starts consumer", meta do
assert {:ok, consumer} =
Consumer.start_link(TestConsumer,
connection: meta.connection,
queue: "consumer",
sync_start: true
)

assert %{started_mode: :sync, consuming: true} = get_state(consumer)
end

test "starts consumer with multiple attempts", meta do
{:ok, counter} = Agent.start(fn -> 0 end)

assert {:ok, consumer} =
Consumer.start_link(TroublesomeTestConsumer,
connection: meta.connection,
queue: "consumer",
sync_start: true,
setup_opts: [counter: counter]
)

assert Agent.get(counter, & &1) == 2
assert %{started_mode: :sync, consuming: true} = get_state(consumer)
end
end

describe "stop/1" do
test "stops consumer", meta do
assert {:ok, consumer, _queue} = start_consumer(meta)
Expand All @@ -128,7 +186,7 @@ defmodule Rabbit.ConsumerTest do
test "disconnects the amqp channel", meta do
assert {:ok, consumer, _queue} = start_consumer(meta)

state = GenServer.call(consumer, :state)
state = get_state(consumer)

assert Process.alive?(state.channel.pid)
assert :ok = Consumer.stop(consumer)
Expand All @@ -143,10 +201,10 @@ defmodule Rabbit.ConsumerTest do
assert {:ok, consumer, _queue} = start_consumer(meta)

connection_state = connection_state(meta.connection)
consumer_state1 = GenServer.call(consumer, :state)
consumer_state1 = get_state(consumer)
AMQP.Connection.close(connection_state.connection)
await_consuming(consumer)
consumer_state2 = GenServer.call(consumer, :state)
consumer_state2 = get_state(consumer)

assert consumer_state1.channel.pid != consumer_state2.channel.pid
end
Expand Down Expand Up @@ -256,7 +314,7 @@ defmodule Rabbit.ConsumerTest do
end

defp await_consuming(consumer) do
state = GenServer.call(consumer, :state)
state = get_state(consumer)

if state.consuming do
:ok
Expand All @@ -270,7 +328,11 @@ defmodule Rabbit.ConsumerTest do
:crypto.strong_rand_bytes(8) |> Base.encode64()
end

defp get_state(consumer) do
GenServer.call(consumer, :state)
end

defp connection_state(connection) do
Connection.transaction(connection, &GenServer.call(&1, :state))
Connection.transaction(connection, &get_state/1)
end
end
36 changes: 36 additions & 0 deletions test/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@ defmodule Rabbit.ProducerTest do
end
end

defmodule TroublesomeTestProducer do
use Rabbit.Producer

@impl Rabbit.Producer
def init(_type, opts) do
{:ok, opts}
end

@impl Rabbit.Producer
def handle_setup(state) do
attempt = Agent.get_and_update(state.setup_opts[:counter], fn n -> {n, n + 1} end)

if attempt == 0 do
{:error, :something_went_wrong}
else
:ok
end
end
end

describe "start_link/3" do
test "starts producer" do
assert {:ok, connection} = Connection.start_link(TestConnection)
Expand Down Expand Up @@ -50,6 +70,22 @@ defmodule Rabbit.ProducerTest do
end
end

describe "start_link/3 with :sync_start" do
test "starts producer with multiple attempts" do
{:ok, counter} = Agent.start(fn -> 0 end)
assert {:ok, connection} = Connection.start_link(TestConnection)

assert {:ok, _producer} =
Producer.start_link(TroublesomeTestProducer,
connection: connection,
sync_start: true,
setup_opts: [counter: counter]
)

assert Agent.get(counter, & &1) == 2
end
end

describe "stop/1" do
test "stops producer" do
assert {:ok, connection} = Connection.start_link(TestConnection)
Expand Down