From 5fb5d93fd7954006120885506c85d539e48f0346 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Kr=C3=A4mer?= Date: Tue, 21 Dec 2021 12:03:11 +0100 Subject: [PATCH] distributed garbage commit --- lettuce/boundary.py | 52 ++++++++++++++++++++- lettuce/distutil.py | 35 ++++++++++++++ lettuce/domdec.py | 24 ++++++++++ lettuce/flows/flow.py | 103 ++++++++++++++++++++++++++++++++++++++++++ lettuce/lattices.py | 1 + lettuce/simulation.py | 50 ++++++++------------ 6 files changed, 231 insertions(+), 34 deletions(-) create mode 100644 lettuce/distutil.py create mode 100644 lettuce/domdec.py create mode 100644 lettuce/flows/flow.py diff --git a/lettuce/boundary.py b/lettuce/boundary.py index 260451a2..795d4f66 100644 --- a/lettuce/boundary.py +++ b/lettuce/boundary.py @@ -15,15 +15,63 @@ """ +import warnings +from typing import Callable import torch import numpy as np -from lettuce import (LettuceException) +from numpy import typing as npt +from .util import LettuceException, LettuceWarning +from .lattices import Lattice + __all__ = ["BounceBackBoundary", "AntiBounceBackOutlet", "EquilibriumBoundaryPU", "EquilibriumOutletP"] +class Boundary: + """Base class for boundary conditions + + Parameters + ---------- + mask_function : Callable + A function that takes the grid as a sequence of `dim` numpy arrays and returns a boolean mask as a numpy array. + + Examples + -------- + + >>> Boundary(lambda x,y : x>=0.5) + """ + def __init__(self, mask_function: Callable = None): + self.mask_function = mask_function + self._mask = None + + def update_mask(self, lattice, grid): + if self.mask_function is not None: + self._mask = lattice.self.mask_function(grid) + + def make_no_stream_mask(self, mask) -> Union[bool, torch.Tensor]: + return False + + def make_no_collision_mask(self, mask): + return False + + @property + def mask(self): + if self._mask is None: + raise LettuceException(f"Call `update_mask` before accessing .mask") + return self._mask + + @mask.setter + def mask(self, mask: npt.NDArray[bool]): + warnings.warn( + "Setting the boundary mask manually is deprecated as it does " + "not support grid refinement and MPI parallelization. " + "Instead, the boundary constructor should receive a mask-generating function. " + ) + self._mask = mask + + class BounceBackBoundary: - """Fullway Bounce-Back Boundary""" + """Full-way Bounce-Back Boundary""" def __init__(self, mask, lattice): self.mask = lattice.convert_to_tensor(mask) diff --git a/lettuce/distutil.py b/lettuce/distutil.py new file mode 100644 index 00000000..d302a7a5 --- /dev/null +++ b/lettuce/distutil.py @@ -0,0 +1,35 @@ + +import os +from typing import Union +from dataclasses import dataclass +import torch +from torch.types import _int, _size + + +@dataclass(init=False) +class MPIConfig: + active: bool + rank: int = 0 + size: int = 0 + + def __init__(self): + try: + self.size = int(os.environ['OMPI_COMM_WORLD_SIZE']) + self.rank = int(os.environ['OMPI_COMM_WORLD_RANK']) + self.active = True + except KeyError: + self.active = False + + +def roll(input: torch.Tensor, shifts: Union[_int, _size], dims: Union[_int, _size]=(), mpi_config=None) -> torch.Tensor: + if mpi_config is None: + return torch.roll(input, shifts, dims) + + assert shifts.abs().max() <= 1 + + indices, target_ranks = mpi_config.outgoing(input, shifts) + indices, source_ranks = mpi_config.incoming(input, shifts) + + +class MPIConfig: + diff --git a/lettuce/domdec.py b/lettuce/domdec.py new file mode 100644 index 00000000..82a7c61a --- /dev/null +++ b/lettuce/domdec.py @@ -0,0 +1,24 @@ + + + +class DomainDecomposition: + def __init__(self, flow, mask): + pass + + def refine(self, refinement_level: int): + pass + + def set_device(self, ): + def communicate(self): + pass + + +class NoDomainDecomposition(DomainDecomposition): + def __init__(self): + def communicate(self): + pass + +class StreamAndCollide: + def __init__(self, streaming, collision): + pass + diff --git a/lettuce/flows/flow.py b/lettuce/flows/flow.py new file mode 100644 index 00000000..2c4870cd --- /dev/null +++ b/lettuce/flows/flow.py @@ -0,0 +1,103 @@ + +from copy import deepcopy +from typing import Sequence +import numpy as np +from numpy import typing as npt +import torch +from typing import Tuple + + +from ..util import LettuceException +from ..lattices import Lattice +from ..boundary import Boundary + + +class Flow: + """ + + Attributes + ---------- + boundaries : Sequence[Boundary] + """ + def __init__(self): + self.grid = NotImplemented + self.units = NotImplemented + self.initial_solution = NotImplemented + + @property + def boundaries(self) -> Sequence[Boundary]: + return [] + + def compute_initial_f(self, lattice: Lattice) -> torch.Tensor: + grid = self.grid + p, u = self.initial_solution(grid) + if not list(p.shape) == [1] + list(grid[0].shape): + raise LettuceException( + f"Wrong dimension of initial pressure field. " + f"Expected {[1] + list(grid[0].shape)}, " + f"but got {list(p.shape)}." + ) + if not list(u.shape) == [lattice.D] + list(grid[0].shape): + raise LettuceException( + "Wrong dimension of initial velocity field." + f"Expected {[lattice.D] + list(grid[0].shape)}, " + f"but got {list(u.shape)}." + ) + u = lattice.convert_to_tensor(self.units.convert_velocity_to_lu(u)) + rho = lattice.convert_to_tensor(self.units.convert_pressure_pu_to_density_lu(p)) + return lattice.equilibrium(rho, lattice.convert_to_tensor(u)) + + def compute_masks(self, lattice: Lattice) -> Tuple[torch.Tensor, torch.Tensor]: + grid = self.grid + grid_shape = grid[0].shape + f_shape = [lattice.Q, *grid_shape] + no_stream_mask = torch.zeros(f_shape, device=lattice.device, dtype=torch.bool) + no_collision_mask = torch.zeros(grid_shape, device=lattice.device, dtype=torch.bool) + + # Apply boundaries + # boundaries = deepcopy(self.boundaries) # store locally to keep the flow free from the boundary state + for boundary in self.boundaries: + boundary.update_mask(lattice, self.grid) + if hasattr(boundary, "make_no_collision_mask"): + no_collision_mask = no_collision_mask | boundary.make_no_collision_mask(f_shape) + if hasattr(boundary, "make_no_stream_mask"): + no_stream_mask = no_stream_mask | boundary.make_no_stream_mask(f_shape) + + return no_stream_mask, no_collision_mask + + + + +class DomainDecomposedFlow: + def __init__(self, flow: Flow, masks: Sequence[npt.NDArray[bool]]): + grid = flow.grid + if not all(mask.shape == grid.shape for mask in masks): + raise ValueError(f"At least one mask shape did not match grid shape ({grid.shape})") + self.flow = flow + self.masks = masks + + + + flow = TaylorGreenVortex3D(...) + + # manual decomposition of the flow domain into rectangular/hexagonal domains + mask0 = flow.grid.x < 0.5 + mask1 = flow.grid.x >= 0.5 + + # set up a distributed flow object + decomposed = DomainDecomposedFlow(flow, masks=(mask0, mask1)) + + # send part of the domain to a device + decomposed.set_device(0, "cuda:0") + + # refine the domain if needed (this is important to do here; big flows will not fit on one node) + decomposed.refine_domain(0, refinement_level=4) + + # send part of the domain to a different device + decomposed.set_device(1, "cuda:1") + + # refine this part more coarsely + decomposed.refine_domain(1, refinement_level=3) + + # set up the simulation with the decomposed flow + simulation = Simulation(decomposed, ...) diff --git a/lettuce/lattices.py b/lettuce/lattices.py index 26376c54..9a47b088 100644 --- a/lettuce/lattices.py +++ b/lettuce/lattices.py @@ -60,6 +60,7 @@ def convert_to_numpy(cls, tensor): def rho(self, f): """density""" + return f[None, ...].sum(dim=0) return torch.sum(f, dim=0)[None, ...] def j(self, f): diff --git a/lettuce/simulation.py b/lettuce/simulation.py index caf4e8f4..5279d0cc 100644 --- a/lettuce/simulation.py +++ b/lettuce/simulation.py @@ -14,6 +14,14 @@ __all__ = ["Simulation"] +class DomainSimulation: + def __init__(self, flow, domain, collision, streaming): + self.f = flow.compute_initial_f() + + + + + class Simulation: """High-level API for simulations. @@ -31,36 +39,12 @@ def __init__(self, flow, lattice, collision, streaming): self.streaming = streaming self.i = 0 - grid = flow.grid - p, u = flow.initial_solution(grid) - assert list(p.shape) == [1] + list(grid[0].shape), \ - LettuceException(f"Wrong dimension of initial pressure field. " - f"Expected {[1] + list(grid[0].shape)}, " - f"but got {list(p.shape)}.") - assert list(u.shape) == [lattice.D] + list(grid[0].shape), \ - LettuceException("Wrong dimension of initial velocity field." - f"Expected {[lattice.D] + list(grid[0].shape)}, " - f"but got {list(u.shape)}.") - u = lattice.convert_to_tensor(flow.units.convert_velocity_to_lu(u)) - rho = lattice.convert_to_tensor(flow.units.convert_pressure_pu_to_density_lu(p)) - self.f = lattice.equilibrium(rho, lattice.convert_to_tensor(u)) - self.reporters = [] # Define masks, where the collision or streaming are not applied - x = flow.grid - self.no_collision_mask = lattice.convert_to_tensor(np.zeros_like(x[0], dtype=bool)) - no_stream_mask = lattice.convert_to_tensor(np.zeros(self.f.shape, dtype=bool)) - - # Apply boundaries - self._boundaries = deepcopy(self.flow.boundaries) # store locally to keep the flow free from the boundary state - for boundary in self._boundaries: - if hasattr(boundary, "make_no_collision_mask"): - self.no_collision_mask = self.no_collision_mask | boundary.make_no_collision_mask(self.f.shape) - if hasattr(boundary, "make_no_stream_mask"): - no_stream_mask = no_stream_mask | boundary.make_no_stream_mask(self.f.shape) - if no_stream_mask.any(): - self.streaming.no_stream_mask = no_stream_mask + self.f = flow.compute_initial_f(self.lattice) + no_stream_mask, self.no_collision_mask = flow.compute_masks(self.lattice) + self.streaming.no_stream_mask = no_stream_mask if no_stream_mask.any() else None def step(self, num_steps): """Take num_steps stream-and-collision steps and return performance in MLUPS.""" @@ -69,11 +53,13 @@ def step(self, num_steps): self._report() for _ in range(num_steps): self.i += 1 - self.f = self.streaming(self.f) - # Perform the collision routine everywhere, expect where the no_collision_mask is true - self.f = torch.where(self.no_collision_mask, self.f, self.collision(self.f)) - for boundary in self._boundaries: - self.f = boundary(self.f) + for local_domains: + self.f = self.streaming(self.f) + # Perform the collision routine everywhere, expect where the no_collision_mask is true + self.f = torch.where(self.no_collision_mask, self.f, self.collision(self.f)) + for boundary in self._boundaries: + self.f = boundary(self.f) + self.communicate() self._report() end = timer() seconds = end - start