Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of concentration code #1929

Merged
merged 10 commits into from
Nov 25, 2024
3 changes: 2 additions & 1 deletion core/src/Ribasim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ using MetaGraphsNext:
using EnumX: EnumX, @enumx

# Easily change an immutable field of an object.
using Accessors: @set
using Accessors: @set, @reset

# Iteration utilities, used to partition and group tables.
import IterTools
Expand Down Expand Up @@ -167,6 +167,7 @@ include("read.jl")
include("write.jl")
include("bmi.jl")
include("callback.jl")
include("concentration.jl")
include("main.jl")
include("libribasim.jl")

Expand Down
201 changes: 66 additions & 135 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
FunctionCallingCallback(update_cumulative_flows!; func_start = false)
push!(callbacks, cumulative_flows_cb)

# Update concentrations
concentrations_cb = FunctionCallingCallback(update_concentrations!; func_start = false)
push!(callbacks, concentrations_cb)

# Update Basin forcings
tstops = get_tstops(basin.time.time, starttime)
basin_cb = PresetTimeCallback(tstops, update_basin!; save_positions = (false, false))
Expand Down Expand Up @@ -113,30 +117,16 @@
the Basin concentration(s) and then remove the mass that is being lost to the outflows.
"""
function update_cumulative_flows!(u, t, integrator)::Nothing
(; p, uprev, tprev, dt) = integrator
(;
basin,
state_inflow_edge,
state_outflow_edge,
user_demand,
level_boundary,
flow_boundary,
allocation,
) = p
(; p, tprev, dt) = integrator
(; basin, flow_boundary, allocation) = p
(; vertical_flux) = basin

# Update tprev
p.tprev[] = t

# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
basin.cumulative_in .= 0.0

# Update cumulative forcings which are integrated exactly
@. basin.cumulative_drainage += vertical_flux.drainage * dt
@. basin.cumulative_drainage_saveat += vertical_flux.drainage * dt
basin.mass .+= basin.concentration[1, :, :] .* vertical_flux.drainage * dt
basin.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
Expand All @@ -145,9 +135,6 @@

basin.cumulative_precipitation[node_id.idx] += added_precipitation
basin.cumulative_precipitation_saveat[node_id.idx] += added_precipitation
basin.mass[node_id.idx, :] .+=
basin.concentration[2, node_id.idx, :] .* added_precipitation
basin.cumulative_in[node_id.idx] += added_precipitation
end

# Exact boundary flow over time step
Expand All @@ -162,9 +149,6 @@
volume = integral(flow_rate, tprev, t)
flow_boundary.cumulative_flow[id.idx] += volume
flow_boundary.cumulative_flow_saveat[id.idx] += volume
basin.mass[outflow_id.idx, :] .+=
flow_boundary.concentration[id.idx, :] .* volume
basin.cumulative_in[outflow_id.idx] += volume
end
end

Expand All @@ -189,138 +173,83 @@
end
end
end
return nothing
end

# Process mass updates for UserDemand separately
# as the inflow and outflow are decoupled in the states
for (inflow_edge, outflow_edge) in
zip(user_demand.inflow_edge, user_demand.outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
userdemand_idx = outflow_edge.edge[1].idx
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
basin.mass[from_node.idx, :] .-=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
end
function update_concentrations!(u, t, integrator)::Nothing
(; uprev, p, tprev, dt) = integrator
(; basin, flow_boundary) = p
(; vertical_flux, concentration_data) = basin
(; evaporate_mass, cumulative_in, concentration_state, concentration, mass) =
concentration_data

# Process all mass inflows to basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.cumulative_in[from_node.idx] -= flow
if to_node.type == NodeType.Basin
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
elseif to_node.type == NodeType.LevelBoundary
basin.mass[from_node.idx, :] .-=
level_boundary.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.UserDemand
basin.mass[from_node.idx, :] .-=
user_demand.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.Terminal && to_node.value == 0
# UserDemand inflow is discoupled from its outflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow"
end
end
end
# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
cumulative_in .= 0.0

if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.cumulative_in[to_node.idx] += flow
if from_node.type == NodeType.Basin
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
elseif from_node.type == NodeType.LevelBoundary
basin.mass[to_node.idx, :] .+=
level_boundary.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.UserDemand
basin.mass[to_node.idx, :] .+=
user_demand.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.Terminal && from_node.value == 0
# UserDemand outflow is discoupled from its inflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow"
end
end
end
mass .+= concentration[1, :, :] .* vertical_flux.drainage * dt
basin.concentration_data.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
fixed_area = basin_areas(basin, node_id.idx)[end]
added_precipitation = fixed_area * vertical_flux.precipitation[node_id.idx] * dt

mass[node_id.idx, :] .+= concentration[2, node_id.idx, :] .* added_precipitation
cumulative_in[node_id.idx] += added_precipitation
end

# Update the Basin concentrations based on the added mass and flows
basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in)

# Process all mass outflows from Basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow > 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[from_node.idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow < 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[to_node.idx, :] .* flow
end
# Exact boundary flow over time step
for (id, flow_rate, active, edge) in zip(
flow_boundary.node_id,
flow_boundary.flow_rate,
flow_boundary.active,
flow_boundary.outflow_edges,
)
if active
outflow_id = edge[1].edge[2]
volume = integral(flow_rate, tprev, t)
mass[outflow_id.idx, :] .+= flow_boundary.concentration[id.idx, :] .* volume
cumulative_in[outflow_id.idx] += volume
end
end

mass_updates_user_demand!(integrator)
mass_inflows_basin!(integrator)

# Update the Basin concentrations based on the added mass and flows
concentration_state .= mass ./ (basin.storage_prev .+ cumulative_in)

mass_outflows_basin!(integrator)

# Evaporate mass to keep the mass balance, if enabled in model config
if basin.evaporate_mass
basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation)
if evaporate_mass
mass .-= concentration_state .* (u.evaporation - uprev.evaporation)
end
basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration)
mass .-= concentration_state .* (u.infiltration - uprev.infiltration)

# Take care of infinitely small masses, possibly becoming negative due to truncation.
for I in eachindex(basin.mass)
if (-eps(Float64)) < basin.mass[I] < (eps(Float64))
basin.mass[I] = 0.0
for I in eachindex(basin.concentration_data.mass)
if (-eps(Float64)) < mass[I] < (eps(Float64))
mass[I] = 0.0
end
end

# Check for negative masses
if any(<(0), basin.mass)
R = CartesianIndices(basin.mass)
locations = findall(<(0), basin.mass)
if any(<(0), mass)
R = CartesianIndices(mass)
locations = findall(<(0), mass)

Check warning on line 241 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L240-L241

Added lines #L240 - L241 were not covered by tests
for I in locations
basin_idx, substance_idx = Tuple(R[I])
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])"
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.concentration_data.mass[I]) for substance $(basin.concentration_data.substances[substance_idx])"

Check warning on line 244 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L244

Added line #L244 was not covered by tests
end
error("Negative mass(es) detected")
end

# Update the Basin concentrations again based on the removed mass
basin.concentration_state .=
basin.mass ./ basin.current_properties.current_storage[parent(u)]
concentration_state .= mass ./ basin.current_properties.current_storage[parent(u)]
basin.storage_prev .= basin.current_properties.current_storage[parent(u)]
basin.level_prev .= basin.current_properties.current_level[parent(u)]

return nothing
end

Expand Down Expand Up @@ -427,7 +356,7 @@
@. basin.cumulative_precipitation_saveat = 0.0
@. basin.cumulative_drainage_saveat = 0.0

concentration = copy(basin.concentration_state)
concentration = copy(basin.concentration_data.concentration_state)
saved_flow = SavedFlow(;
flow = flow_mean,
inflow = inflow_mean,
Expand Down Expand Up @@ -682,11 +611,12 @@
end

elseif startswith(variable, "concentration_external.")
value = basin.concentration_external[listen_node_id.idx][variable](t)
value =
basin.concentration_data.concentration_external[listen_node_id.idx][variable](t)
elseif startswith(variable, "concentration.")
substance = Symbol(last(split(variable, ".")))
var_idx = findfirst(==(substance), basin.substances)
value = basin.concentration_state[listen_node_id.idx, var_idx]
var_idx = findfirst(==(substance), basin.concentration_data.substances)
value = basin.concentration_data.concentration_state[listen_node_id.idx, var_idx]

Check warning on line 619 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L618-L619

Added lines #L618 - L619 were not covered by tests
else
error("Unsupported condition variable $variable.")
end
Expand Down Expand Up @@ -781,7 +711,8 @@
function update_basin_conc!(integrator)::Nothing
(; p) = integrator
(; basin) = p
(; node_id, concentration, concentration_time, substances) = basin
(; node_id, concentration_data, concentration_time) = basin
(; concentration, substances) = concentration_data
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
Expand All @@ -802,7 +733,7 @@
node = getproperty(p, parameter)
(; basin) = p
(; node_id, concentration, concentration_time) = node
(; substances) = basin
(; substances) = basin.concentration_data
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
Expand Down
Loading
Loading