Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ogmios connection errors #43

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

- Checks Ogmios server health prior to sending initial messages for ChainSync and Mempool. In case
the underlying not is not yet ready or still syncinging with the network, it reports back status and
attempts a reconnection after 5 seconds.

## [v0.5.1](https://github.com/wowica/xogmios/releases/tag/v0.5.1) (2024-09-04)

### Fixed
Expand Down
7 changes: 7 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import Config

env_config_file = "#{config_env()}.exs"

if File.exists?("config/#{env_config_file}") do
import_config env_config_file
end
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Config

config :xogmios, Xogmios.HealthCheck, http_client: Xogmios.HealthCheck.HTTPClientMock
2 changes: 1 addition & 1 deletion lib/xogmios/chain_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ defmodule Xogmios.ChainSync do
"""
@spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()}
def start_link(client, opts) do
{url, opts} = Keyword.pop(opts, :url)
url = Keyword.fetch!(opts, :url)
{name, opts} = Keyword.pop(opts, :name, client)

initial_state = Keyword.merge(opts, handler: client)
Expand Down
37 changes: 30 additions & 7 deletions lib/xogmios/chain_sync/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ defmodule Xogmios.ChainSync.Connection do
This module implements a connection with the Ogmios Websocket server
for the Chain Synchronization protocol.
"""
require Logger

alias Xogmios.HealthCheck
alias Xogmios.ChainSync.Messages

defmacro __using__(_opts) do
Expand All @@ -25,6 +27,11 @@ defmodule Xogmios.ChainSync.Connection do
}
end

defp send_initial_sync_message do
start_message = Messages.initial_sync()
:websocket_client.cast(self(), {:text, start_message})
end

@impl true
def init(state) do
initial_state =
Expand All @@ -39,15 +46,31 @@ defmodule Xogmios.ChainSync.Connection do
def onconnect(connection, state) do
state = Map.put(state, :ws_pid, self())

start_message = Messages.initial_sync()
:websocket_client.cast(self(), {:text, start_message})
with :ok <- HealthCheck.run(state.url),
:ok <- send_initial_sync_message() do
case state.handler.handle_connect(state) do
{:ok, new_state} ->
{:ok, new_state}

case state.handler.handle_connect(state) do
{:ok, new_state} ->
{:ok, new_state}
_ ->
{:ok, state}
end
else
{:error, reason} ->
Logger.error("""
Ogmios reported error: #{reason}\
Trying reconnection in 5 seconds.
""")

_ ->
{:ok, state}
{:reconnect, 5_000, state}

{:incomplete, reason} ->
Logger.warning("""
#{reason} \
Trying reconnection in 5 seconds.
""")

{:reconnect, 5_000, state}
end
end

Expand Down
68 changes: 68 additions & 0 deletions lib/xogmios/health_check.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule Xogmios.HealthCheck do
@moduledoc """
Performs a health check against the Ogmios server's HTTP endpoint.
Used primarily to determine if the underlying Cardano node is fully synced.
"""

@http_client :httpc

@spec run(url :: String.t()) :: :ok | {:error, String.t()}
def run(ogmios_url) do
url = parse_url(ogmios_url)

# NOTE: Investigate if there's a better way to start these.
:inets.start()
:ssl.start()
caike marked this conversation as resolved.
Show resolved Hide resolved

case client().request(:get, {String.to_charlist(url), []}, [], []) do
{:ok, {{_, status_code, _}, _headers, json_body}} ->
case status_code do
200 ->
:ok

202 ->
response_body = Jason.decode!(json_body)

progress =
response_body["networkSynchronization"]
|> Kernel.*(100)
|> Float.round(4)

{:incomplete, "Cardano Node not ready. Network sync progress at #{progress}%"}

_error_status_code ->
{:error,
"""
Ogmios service error. Cardano Node not ready.
"""}
end

{:error, reason} ->
{:error, "Error: #{inspect(reason)}"}
end
end

defp client do
# Xogmios.HealthCheck.HTTPClientMock is used on test runs
Application.get_env(:xogmios, __MODULE__, [])
|> Keyword.get(:http_client, @http_client)
end

defp parse_url(url) do
url
|> replace_protocol()
|> append_health_path()
end

defp replace_protocol(url) do
url
|> String.replace_prefix("ws://", "http://")
|> String.replace_prefix("wss://", "https://")
end

defp append_health_path(url) do
uri = URI.parse(url)
path = if uri.path in [nil, ""], do: "/health", else: uri.path <> "/health"
URI.to_string(%{uri | path: path})
end
end
37 changes: 29 additions & 8 deletions lib/xogmios/mempool/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Xogmios.Mempool.Connection do
for the Mempool protocol.
"""

alias Xogmios.HealthCheck
alias Xogmios.Mempool.Messages

defmacro __using__(_opts) do
Expand All @@ -25,6 +26,11 @@ defmodule Xogmios.Mempool.Connection do
}
end

defp acquire_mempool do
start_message = Messages.acquire_mempool()
:websocket_client.cast(self(), {:text, start_message})
end

@impl true
def init(state) do
initial_state =
Expand All @@ -39,16 +45,31 @@ defmodule Xogmios.Mempool.Connection do
def onconnect(connection, state) do
state = Map.put(state, :ws_pid, self())

start_message = Messages.acquire_mempool()
:websocket_client.cast(self(), {:text, start_message})
send(state.notify_on_connect, {:connected, connection})
with :ok <- HealthCheck.run(state.url),
:ok <- acquire_mempool() do
case state.handler.handle_connect(state) do
{:ok, new_state} ->
{:ok, new_state}

case state.handler.handle_connect(state) do
{:ok, new_state} ->
{:ok, new_state}
_ ->
{:ok, state}
end
else
{:error, reason} ->
Logger.error("""
Ogmios reported error: #{reason}\
Trying reconnection in 5 seconds.
""")

_ ->
{:ok, state}
{:reconnect, 5_000, state}

{:incomplete, reason} ->
Logger.warning("""
#{reason} \
Trying reconnection in 5 seconds.
""")

{:reconnect, 5_000, state}
end
end

Expand Down
16 changes: 3 additions & 13 deletions lib/xogmios/mempool_txs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,14 @@ defmodule Xogmios.MempoolTxs do
"""
@spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()}
def start_link(client, opts) do
{url, opts} = Keyword.pop(opts, :url)
url = Keyword.fetch!(opts, :url)
{name, opts} = Keyword.pop(opts, :name, client)
opts = Keyword.put_new(opts, :include_details, false)
initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self())
initial_state = Keyword.merge(opts, handler: client)

with {:ok, process_name} <- build_process_name(name),
{:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do
# Blocks until the connection with the Ogmios server
# is established or until timeout is reached.

receive do
{:connected, _connection} -> {:ok, ws_pid}
after
_timeout = 5_000 ->
Logger.warning("Timeout connecting to Ogmios server")
send(ws_pid, :close)
{:error, :connection_timeout}
end
{:ok, ws_pid}
else
{:error, :invalid_process_name} = error ->
error
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Xogmios.MixProject do
{:mix_audit, "~> 2.1", only: [:dev, :test], runtime: false},
{:plug, "~> 1.15", only: :test},
{:plug_cowboy, "~> 2.6", only: :test},
{:websocket_client, "~> 1.5"}
{:websocket_client, github: "caike/websocket_client", ref: "966daed"}
]
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"websocket_client": {:hex, :websocket_client, "1.5.0", "e825f23c51a867681a222148ed5200cc4a12e4fb5ff0b0b35963e916e2b5766b", [:rebar3], [], "hexpm", "2b9b201cc5c82b9d4e6966ad8e605832eab8f4ddb39f57ac62f34cb208b68de9"},
"websocket_client": {:git, "https://github.com/caike/websocket_client.git", "966daeda981e48408c684930c334b3f1161c2728", [ref: "966daed"]},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
"yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"},
}
6 changes: 6 additions & 0 deletions test/support/health_check/http_client_mock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule Xogmios.HealthCheck.HTTPClientMock do
@moduledoc """
Mocks the health check response. This module is used in tests.
"""
def request(:get, _url, _options, _profile), do: {:ok, {{"", 200, ""}, [], []}}
end
Loading