Skip to content

Commit

Permalink
Add some recipes
Browse files Browse the repository at this point in the history
  • Loading branch information
zolazhou committed Dec 30, 2015
1 parent 9bad0cc commit ea9373b
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 0 deletions.
89 changes: 89 additions & 0 deletions lib/consul/heartbeat.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
defmodule Consul.Heartbeat do
use GenServer

@retry_ms 5000

defmodule State do
defstruct timer: nil,
check_id: nil,
interval: nil
end

# ===================================================================
# API
# ===================================================================

@spec start_link(binary, integer) :: GenServer.on_start
def start_link(check_id, interval) do
GenServer.start_link(__MODULE__, [check_id, interval])
end

@spec start(binary, integer) :: GenServer.on_start
def start(check_id, interval) do
GenServer.start(__MODULE__, [check_id, interval])
end

@spec stop(pid) :: :ok
def stop(pid) do
GenServer.call(pid, :stop)
end

# ===================================================================
# GenServer callbacks
# ===================================================================

def init([check_id, interval]) do
case service_name(check_id) do
{:ok, _service} ->
{:ok, %State{timer: nil,
check_id: check_id,
interval: interval}, 0}
error ->
{:stop, error}
end
end

def handle_info(:timeout, %{check_id: check_id, interval: interval} = state) do
case send_pulse(check_id) do
:ok ->
timer = :erlang.send_after(interval, self, :pulse)
{:noreply, %{state | timer: timer}}
:error ->
{:noreply, state, @retry_ms}
end
end

def handle_info(:pulse, %{check_id: check_id, interval: interval, timer: timer} = state) do
:erlang.cancel_timer(timer)
send_pulse(check_id)
new_timer = :erlang.send_after(interval, self, :pulse)
{:noreply, %{state | timer: new_timer}}
end

def handle_call(:stop, _, state) do
{:stop, :normal, :ok, state}
end

def terminate(_reason, _state) do
:ok
end

# ===================================================================
# Internal functions
# ===================================================================

defp send_pulse(check_id) do
{result, _} = Consul.Agent.Check.pass(check_id)
result
end

defp service_name(check_id) when is_binary(check_id) do
case String.split(check_id, ":", parts: 2) do
["service", name] ->
{:ok, name}
_ ->
{:error, {:invalid_check_id, check_id}}
end
end

end
11 changes: 11 additions & 0 deletions lib/consul/watch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Consul.Watch do
defstruct type: nil,
handlers: [],
args: [],
index: nil

@type t :: %__MODULE__{ type: :key | :keyprefix | :service | :event,
handlers: [],
args: [term],
index: String.t }
end
9 changes: 9 additions & 0 deletions lib/consul/watch/send_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Consul.Watch.SendHandler do
use GenEvent

@doc false
def handle_event(event, %{pid: pid} = state) do
send(pid, event)
{:ok, state}
end
end
65 changes: 65 additions & 0 deletions lib/consul/watcher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Consul.Watcher do
use GenServer

@wait "10m"
@retry_ms 30 * 1000

alias Consul.Watch

@spec start_link(Watch.t) :: GenServer.on_start
def start_link(watch) do
GenServer.start_link(__MODULE__, [watch])
end

#
# GenServer callbacks
#

def init([%Watch{type: type, args: args, handlers: handlers, index: index}]) do
{:ok, em} = GenEvent.start_link()

Enum.each handlers, fn
{handler, args} ->
:ok = GenEvent.add_handler(em, handler, args)
handler ->
:ok = GenEvent.add_handler(em, handler, [])
end

{:ok, %{type: type, args: args, em: em, index: index, hash: nil}, 0}
end

def handle_info(:timeout, %{type: type, args: args, index: index, em: em, hash: hash} = state) do
case do_watch(type, args, index) do
{:ok, %{body: body} = response} ->
new_index = Consul.Response.consul_index(response)
case :erlang.phash2(body) do
^hash ->
{:noreply, %{state | index: new_index}, 0}
new_hash ->
GenEvent.ack_notify(em, {:consul_watch, type, body})
{:noreply, %{state | index: new_index, hash: new_hash}, 0}
end
{:error, %{reason: :timeout}} ->
{:noreply, state, 100}
{:error, _} ->
{:noreply, state, @retry_ms}
end
end

#
# Private
#
defp do_watch(:key, [key], index) do
Consul.Kv.fetch(key, wait: @wait, index: index)
end
defp do_watch(:keyprefix, [prefix], index) do
Consul.Kv.fetch(prefix, wait: @wait, index: index, recurse: true)
end
defp do_watch(:service, [name, tag], index) do
Consul.Catalog.service(name, wait: @wait, index: index, tag: tag)
end
defp do_watch(:health, [name, tag], index) do
Consul.Health.service(name, wait: @wait, index: index, tag: tag)
end

end

0 comments on commit ea9373b

Please sign in to comment.