Skip to content

Commit

Permalink
Use an UPSERT SQL query to insert or update the projection version
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Jan 17, 2024
1 parent d77c087 commit 5632542
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
53 changes: 28 additions & 25 deletions lib/projections/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,40 @@ defmodule Commanded.Projections.Ecto do
projection_name = Map.fetch!(metadata, :handler_name)
event_number = Map.fetch!(metadata, :event_number)

changeset =
%ProjectionVersion{projection_name: projection_name}
|> ProjectionVersion.changeset(%{last_seen_event_number: event_number})
projection_version = %ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: event_number
}

prefix = schema_prefix(event, metadata)

# Query to update an existing projection version with the last seen event number with
# a check to ensure that the event has not already been projected.
update_projection_version =
from(pv in ProjectionVersion,
where:
pv.projection_name == ^projection_name and pv.last_seen_event_number < ^event_number,
update: [set: [last_seen_event_number: ^event_number]]
)

multi =
Ecto.Multi.new()
|> Ecto.Multi.run(:verify_projection_version, fn repo, _changes ->
version =
case repo.get(ProjectionVersion, projection_name, prefix: prefix) do
nil ->
repo.insert!(
%ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: 0
},
prefix: prefix
)

version ->
version
end

if version.last_seen_event_number < event_number do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
|> Ecto.Multi.run(:track_projection_version, fn repo, _changes ->
try do
repo.insert(projection_version,
prefix: prefix,
on_conflict: update_projection_version,
conflict_target: [:projection_name]
)
rescue
exception in Ecto.StaleEntryError ->
# Attempted to insert a projection version for an already seen event
{:error, :already_seen_event}

exception ->
reraise exception, __STACKTRACE__
end
end)
|> Ecto.Multi.update(:projection_version, changeset, prefix: prefix)

with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]),
{:ok, changes} <- transaction(multi) do
Expand All @@ -99,7 +102,7 @@ defmodule Commanded.Projections.Ecto do
:ok
end
else
{:error, :verify_projection_version, :already_seen_event, _changes} -> :ok
{:error, :track_projection_version, :already_seen_event, _changes} -> :ok
{:error, _stage, error, _changes} -> {:error, error}
{:error, _error} = reply -> reply
end
Expand Down
21 changes: 21 additions & 0 deletions test/projections/ecto_projection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ defmodule Commanded.Projections.EctoProjectionTest do
assert_seen_event("Projector", 3)
end

test "should prevent an event being projected more than once" do
Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1})
Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2})

tasks =
Enum.map(1..3, fn _index ->
Task.async(fn ->
:timer.sleep(:rand.uniform(10))

Projector.handle(%AnEvent{name: "Event3"}, %{handler_name: "Projector", event_number: 3})
end)
end)

results = Task.await_many(tasks)

assert Enum.uniq(results) == [:ok]

assert_projections(Projection, ["Event1", "Event2", "Event3"])
assert_seen_event("Projector", 3)
end

test "should return an error on failure" do
assert {:error, :failure} ==
Projector.handle(%ErrorEvent{}, %{handler_name: "Projector", event_number: 1})
Expand Down

0 comments on commit 5632542

Please sign in to comment.