import math
import os
# Unfortunately, this is necessary due to a cyclic reference. "Mesh" is only needed for static type hints anyway
from typing import TYPE_CHECKING, Dict, List, Sequence, Tuple, Union
import numpy as np
from typing_extensions import Literal
import fdsreader.utils.fortran_data as fdtype
from fdsreader import settings
from fdsreader.utils import Dimension, Extent, Quantity
if TYPE_CHECKING:
from fdsreader.fds_classes import Mesh
[docs]
class Patch:
"""Container for the actual data which is stored as rectangular plane with specific orientation
and extent.
:ivar dimension: :class:`Dimension` object containing information about steps in each dimension.
:ivar extent: :class:`Extent` object containing 3-dimensional extent information.
:ivar orientation: The direction the patch is facing (x={-1;1}, y={-2;2}, z={-3;3}).
:ivar cell_centered: Indicates whether centered positioning for data is used.
:ivar _n_t: Total number of time steps for which output data has been written.
"""
def __init__(
self,
file_path: str,
dimension: Dimension,
extent: Extent,
orientation: int,
cell_centered: bool,
patch_offset: int,
initial_offset: int,
n_t: int,
mesh: "Mesh",
):
self.file_path = file_path
self.dimension = dimension
self.extent = extent
self.orientation = orientation
self.cell_centered = cell_centered
self._patch_offset = patch_offset
self._initial_offset = initial_offset
self._time_offset = -1
self._n_t = n_t
self.mesh = mesh
self._boundary_parent: Boundary = None
[docs]
def n_t(self, count_duplicates=True) -> int:
"""Get the number of timesteps for which data was output.
:param count_duplicates: If true, return the total number of data points, even if there is
duplicate data for a timestep. Duplicate data might be output when restarting the
simulation in between the simulation run.
"""
if count_duplicates:
return self._n_t
return np.unique(self._boundary_parent.times).size
@property
def shape(self) -> Tuple:
"""Convenience function to calculate the shape of the array containing data for this patch."""
return self.dimension.shape(self.cell_centered)
@property
def size(self) -> int:
"""Convenience function to calculate the number of data points in the array for this patch."""
return self.dimension.size(self.cell_centered)
def _post_init(self, time_offset: int):
"""Fully initialize the patch as soon as the number of timesteps is known."""
self._time_offset = time_offset
[docs]
def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[Literal["x", "y", "z"], np.ndarray]:
"""Returns a dictionary containing a numpy ndarray with coordinates for each dimension.
For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates.
:param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not.
"""
# orientation = ('x', 'y', 'z')[self.orientation - 1] if self.orientation != 0 else ''
# coords = {'x': set(), 'y': set(), 'z': set()}
coords: Dict[Literal["x", "y", "z"], np.ndarray] = {}
for dim in ("x", "y", "z"):
co = self.mesh.coordinates[dim].copy()
# In case the slice is cell-centered, we will shift the coordinates by half a cell
# and remove the last coordinate
if self.cell_centered and not ignore_cell_centered:
co = co[:-1]
co += abs(co[1] - co[0]) / 2
coords[dim] = co[np.where(np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]))]
if coords[dim].size == 0:
coords[dim] = np.array([co[np.argmin(np.abs(co - self.extent[dim][0]))]])
return coords
@property
def data(self):
"""Method to load the quantity data for a single patch."""
if not hasattr(self, "_data"):
dtype_data = fdtype.new((("f", self.dimension.size(cell_centered=False)),))
if self._n_t == -1:
self._n_t = (os.stat(self.file_path).st_size - self._initial_offset) // self._time_offset
self._data = np.empty((self.n_t(count_duplicates=True),) + self.shape)
with open(self.file_path, "rb") as infile:
for t in range(self.n_t(count_duplicates=True)):
infile.seek(self._initial_offset + self._patch_offset + t * self._time_offset)
data = np.fromfile(infile, dtype_data, 1)[0][1].reshape(
self.dimension.shape(cell_centered=False), order="F"
)
if self.cell_centered:
self._data[t, :] = data[:-1, :-1]
else:
self._data[t, :] = data
unique_times_indices = np.unique(self._boundary_parent.times, return_index=True)[1]
self._data = self._data[unique_times_indices]
return self._data
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory."""
if hasattr(self, "_data"):
del self._data
def __repr__(self, *args, **kwargs):
return f"Patch(shape={self.shape}, orientation={self.orientation}, extent={self.extent})"
[docs]
class Boundary:
"""Container for boundary data specific to one quantity.
:ivar quantity: Quantity object containing information about the quantity calculated for this
:class:`Obstruction` with the corresponding short_name and unit.
:ivar times: Numpy array containing all times for which data has been recorded.
:ivar cell_centered: Indicates whether centered positioning for data is used.
:ivar lower_bounds: Dictionary with lower bounds for each timestep with meshes as keys.
:ivar upper_bounds: Dictionary with upper bounds for each timestep with meshes as keys.
:ivar n_t: Total number of time steps for which output data has been written.
"""
def __init__(
self,
quantity: Quantity,
cell_centered: bool,
times: Sequence[float],
patches: List[Patch],
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
):
self.quantity = quantity
self.cell_centered = cell_centered
self._patches = patches
self.times = times
self.lower_bounds = lower_bounds
self.upper_bounds = upper_bounds
[docs]
def n_t(self, count_duplicates=True) -> int:
"""Get the number of timesteps for which data was output.
:param count_duplicates: If true, return the total number of data points, even if there is
duplicate data for a timestep. Duplicate data might be output when restarting the
simulation in between the simulation run.
"""
return self._patches[0].n_t(count_duplicates=count_duplicates)
@property
def orientations(self):
"""Return all orientations for which there is data available."""
return [p.orientation for p in self._patches]
[docs]
def get_nearest_timestep(self, time: float) -> int:
"""Calculates the nearest timestep for which data has been output for this obstruction."""
idx = np.searchsorted(self.times, time, side="left")
if time > 0 and (
idx == len(self.times) or math.fabs(time - self.times[idx - 1]) < math.fabs(time - self.times[idx])
):
return idx - 1
else:
return idx
@property
def data(self) -> Dict[int, Patch]:
"""The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y)."""
return {p.orientation: p for p in self._patches}
[docs]
def vmin(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float:
"""Minimum value of all patches at any time.
:param orientation: Optionally filter by patches with a specific orientation.
"""
if orientation == 0:
curr_min = np.min(self.lower_bounds)
if curr_min == np.float32(+1e33):
return float(min(np.min(p.data) for p in self._patches))
return float(curr_min)
else:
return float(np.min(self.data[orientation].data))
[docs]
def vmax(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float:
"""Maximum value of all patches at any time.
:param orientation: Optionally filter by patches with a specific orientation.
"""
if orientation == 0:
curr_max = np.max(self.upper_bounds)
if curr_max == np.float32(-1e33):
return float(max(np.max(p.data) for p in self._patches))
return float(curr_max)
else:
return float(np.max(self.data[orientation].data))
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory."""
for p in self._patches:
p.clear_cache()
def __repr__(self):
return f"Boundary(Quantity={self.quantity}, Patches={len(self._patches)})"
[docs]
class SubObstruction:
"""An :class:`Obstruction` consists of 1 or more SubObstructions which can be hidden at specific points in time.
:ivar extent: :class:`Extent` object containing 3-dimensional extent information.
:ivar bound_indices: Indices used to define obstruction bounds in terms of mesh locations.
:ivar side_surfaces: Tuple of six :class:`Surface` s for each side of the cuboid.
:ivar hide_times: List with points in time from when on the SubObstruction will be hidden.
:ivar show_times: List with points in time from when on the SubObstruction will be shown.
"""
def __init__(
self, side_surfaces: Tuple, bound_indices: Tuple[int, int, int, int, int, int], extent: Extent, mesh: "Mesh"
):
self.extent = extent
self.side_surfaces = side_surfaces
self.bound_indices = {
"x": (bound_indices[0], bound_indices[1]),
"y": (bound_indices[2], bound_indices[3]),
"z": (bound_indices[4], bound_indices[5]),
}
self.mesh = mesh
self._boundary_data: Dict[int, Boundary] = dict()
self.hide_times = list()
self.show_times = list()
def _add_patches(
self,
bid: int,
cell_centered: bool,
quantity: str,
short_name: str,
unit: str,
patches: List[Patch],
times: Sequence[float],
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
):
if bid not in self._boundary_data:
self._boundary_data[bid] = Boundary(
Quantity(quantity, short_name, unit), cell_centered, times, patches, lower_bounds, upper_bounds
)
# Add reference to parent boundary class in patches
for patch in patches:
patch._boundary_parent = self._boundary_data[bid]
if not settings.LAZY_LOAD:
_ = self._boundary_data[bid].data
@property
def orientations(self):
"""Return all orientations for which there is data available."""
if self.has_boundary_data:
return next(iter(self._boundary_data.values())).orientations
return []
[docs]
def get_coordinates(
self, ignore_cell_centered: bool = False
) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]:
"""Returns a dictionary containing a numpy ndarray with coordinates for each dimension.
For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates.
:param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not.
"""
if self.has_boundary_data:
return {
orientation: patch.get_coordinates(ignore_cell_centered)
for orientation, patch in next(iter(self._boundary_data.values())).data.items()
}
return {}
[docs]
def get_nearest_index(self, dimension: Literal["x", "y", "z"], orientation: int, value: float) -> int:
"""Get the nearest mesh coordinate index in a specific dimension for a specific orientation."""
if self.has_boundary_data:
coords = self.get_coordinates()[orientation][dimension]
idx = np.searchsorted(coords, value, side="left")
if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx])):
return idx - 1
else:
return idx
return np.nan
@property
def has_boundary_data(self):
return len(self._boundary_data) != 0
[docs]
def get_data(self, quantity: Union[str, Quantity]):
if isinstance(quantity, Quantity):
quantity = quantity.name
return next(
b
for b in self._boundary_data.values()
if b.quantity.name.lower() == quantity.lower() or b.quantity.short_name.lower() == quantity.lower()
)
def __getitem__(self, item):
if isinstance(item, int):
return self._boundary_data[item]
return self.get_data(item)
def _hide(self, time: float):
self.hide_times.append(time)
self.hide_times.sort()
def _show(self, time: float):
self.show_times.append(time)
self.show_times.sort()
[docs]
def n_t(self, count_duplicates=True) -> int:
"""Returns the number of timesteps for which boundary data is available.
:param count_duplicates: If true, return the total number of data points, even if there is
duplicate data for a timestep. Duplicate data might be output when restarting the
simulation in between the simulation run.
"""
if self.has_boundary_data:
return next(iter(self._boundary_data.values())).n_t(count_duplicates=count_duplicates)
return 0
@property
def times(self):
"""Return all timesteps for which boundary data is available, if any."""
if self.has_boundary_data:
return next(iter(self._boundary_data.values())).times
return np.array([])
[docs]
def get_visible_times(self, times: Sequence[float]) -> np.ndarray:
"""Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden."""
ret = list()
hidden = False
for time in times:
if time in self.show_times:
hidden = False
if time in self.hide_times:
hidden = True
if not hidden:
ret.append(time)
return np.array(ret)
[docs]
def vmin(
self,
quantity: Union[str, Quantity],
orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0,
) -> float:
"""Minimum value of all patches at any time for a specific quantity.
:param orientation: Optionally filter by patches with a specific orientation.
"""
if self.has_boundary_data:
return self.get_data(quantity).vmin(orientation)
return np.nan
[docs]
def vmax(
self,
quantity: Union[str, Quantity],
orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0,
) -> float:
"""Maximum value of all patches at any time for a specific quantity.
:param orientation: Optionally filter by patches with a specific orientation.
"""
if self.has_boundary_data:
return self.get_data(quantity).vmax(orientation)
return np.nan
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory."""
for bndf in self._boundary_data.values():
bndf.clear_cache()
def __repr__(self):
return f"SubObstruction(Extent={self.extent})"
[docs]
class Obstruction:
"""A box-shaped obstruction with specific surfaces (materials) on each side.
:ivar id: ID of the obstruction.
:ivar color_index: Type of coloring used to color obstruction.
-1 - default color
-2 - invisible
-3 - use red, green, blue and alpha (rgba attribute)
n>0 - use n’th color table entry
:ivar block_type: Defines how the obstruction is drawn.
-1 - use surface to obtain blocktype
0 - regular block
2 - outline
:ivar texture_origin: Origin position of the texture provided by the surface. When the texture
does have a pattern, for example windows or bricks, the texture_origin specifies where the
pattern should begin.
:ivar rgba: Optional color of the obstruction in form of a 4-element tuple
(ranging from 0.0 to 1.0).
"""
def __init__(
self,
oid: str,
color_index: int,
block_type: int,
texture_origin: Tuple[float, float, float],
rgba: Union[Tuple[()], Tuple[float, float, float, float]] = (),
):
self.id = oid
self.color_index = color_index
self.block_type = block_type
self.texture_origin = texture_origin
if len(rgba) != 0:
self.rgba = rgba
self._subobstructions: Dict[str, SubObstruction] = dict()
@property
def bounding_box(self) -> Extent:
""":class:`Extent` object representing the bounding box around the Obstruction."""
extents = [sub.extent for sub in self._subobstructions.values()]
return Extent(
min(extents, key=lambda e: e.x_start).x_start,
max(extents, key=lambda e: e.x_end).x_end,
min(extents, key=lambda e: e.y_start).y_start,
max(extents, key=lambda e: e.y_end).y_end,
min(extents, key=lambda e: e.z_start).z_start,
max(extents, key=lambda e: e.z_end).z_end,
)
@property
def orientations(self):
"""Return all orientations for which there is data available."""
if self.has_boundary_data:
orientations = set()
for subobst in self._subobstructions.values():
orientations.update(subobst.orientations)
return sorted(list(orientations))
return []
[docs]
def n_t(self, count_duplicates=True) -> int:
"""Returns the number of timesteps for which boundary data is available.
:param count_duplicates: If true, return the total number of data points, even if there is
duplicate data for a timestep. Duplicate data might be output when restarting the
simulation in between the simulation run.
"""
for subobst in self._subobstructions.values():
if subobst.has_boundary_data:
return subobst.n_t(count_duplicates=count_duplicates)
return 0
@property
def times(self):
"""Return all timesteps for which boundary data is available, if any."""
for subobst in self._subobstructions.values():
if subobst.has_boundary_data:
return subobst.times
return np.array([])
[docs]
def get_visible_times(self, times: Sequence[float]):
"""Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden."""
for subobst in self._subobstructions.values():
return subobst.get_visible_times(times)
return np.array([])
[docs]
def get_coordinates(
self, ignore_cell_centered: bool = False
) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]:
"""Returns a dictionary containing a numpy ndarray with coordinates for each dimension.
For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates.
:param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not.
"""
if self.has_boundary_data:
all_coords = dict()
for orientation_int in self.orientations:
orientation = ("x", "y", "z")[abs(orientation_int) - 1]
coords = {"x": set(), "y": set(), "z": set()}
for dim in ("x", "y", "z"):
if orientation == dim:
bounding_box_index = 0 if orientation_int < 0 else 1
coords[dim] = np.array([self.bounding_box[dim][bounding_box_index]])
continue
min_delta = math.inf
for subobst in self._subobstructions.values():
subobst_coords = subobst.get_coordinates(ignore_cell_centered)
if orientation_int not in subobst_coords:
continue
co = subobst_coords[orientation_int][dim]
min_delta = min(min_delta, co[1] - co[0])
coords[dim].update(co)
coords[dim] = np.array(sorted(list(coords[dim])))
to_keep = np.full(coords[dim].shape, True)
for i in range(len(coords[dim]) - 1):
if abs(coords[dim][i] - coords[dim][i + 1]) < min_delta / 2:
to_keep[i + 1] = False
coords[dim] = coords[dim][to_keep]
if len(coords[dim]) == 0:
bounding_box_index = 0 if orientation_int < 0 else 1
single_coordinate = self.bounding_box[dim][bounding_box_index]
nearest_coordinate = np.inf
for subobst in self._subobstructions.values():
mesh = subobst.mesh
mesh_coords = mesh.coordinates[dim]
idx = np.searchsorted(mesh_coords, single_coordinate, side="left")
if idx > 0 and (
idx == mesh_coords.size
or math.fabs(single_coordinate - mesh_coords[idx - 1])
< math.fabs(single_coordinate - mesh_coords[idx])
):
idx = idx + 1
if mesh_coords[idx] - single_coordinate < nearest_coordinate - single_coordinate:
nearest_coordinate = mesh_coords[idx]
coords[dim] = np.array([nearest_coordinate])
all_coords[orientation_int] = coords
return all_coords
return dict()
[docs]
def get_nearest_index(self, dimension: Literal["x", "y", "z"], orientation: int, value: float) -> int:
"""Get the nearest mesh coordinate index in a specific dimension for a specific orientation."""
if self.has_boundary_data:
coords = self.get_coordinates()[orientation][dimension]
idx = np.searchsorted(coords, value, side="left")
if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx])):
return idx - 1
else:
return idx
return np.nan
@property
def quantities(self) -> List[Quantity]:
"""Get a list of all quantities for which boundary data exists."""
if self.has_boundary_data:
qs = set()
for subobst in self._subobstructions.values():
for b in subobst._boundary_data.values():
qs.add(b.quantity)
return list(qs)
return []
@property
def meshes(self) -> List["Mesh"]:
"""Returns a list of all meshes this slice cuts through."""
return [subobst.mesh for subobst in self._subobstructions.values()]
[docs]
def filter_by_orientation(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> List[SubObstruction]:
"""Filter all SubObstructions by a specific orientation. All returned SubObstructions will contain boundary data
in the specified orientation.
"""
if self.has_boundary_data:
return [subobst for subobst in self._subobstructions.values() if orientation in subobst.orientations]
return []
[docs]
def get_boundary_data(
self, quantity: Union[Quantity, str], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0
) -> Dict[str, Boundary]:
"""Gets the boundary data for a specific quantity of all SubObstructions.
:param quantity: The quantity to filter by.
:param orientation: Optionally filter by a specific orientation as well (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=z).
A value of 0 indicates to no filter.
"""
if isinstance(quantity, Quantity):
quantity = quantity.name
ret = {
subobst.mesh.id: subobst.get_data(quantity)
for subobst in self._subobstructions.values()
if subobst.has_boundary_data
}
if orientation == 0:
return ret
return {mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys()}
[docs]
def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int:
"""Calculates the nearest timestep for which data has been output for this obstruction."""
if self.has_boundary_data:
times = self.get_visible_times(self.times) if visible_only else self.times
idx = np.searchsorted(times, time, side="left")
if time > 0 and (idx == len(times) or math.fabs(time - times[idx - 1]) < math.fabs(time - times[idx])):
return idx - 1
else:
return idx
return np.nan
[docs]
def get_nearest_patch(self, x: float = None, y: float = None, z: float = None):
"""Gets the patch of the :class:`SubObstruction` that has the least distance to the given point.
If there are multiple patches with the same distance, a random one will be selected.
"""
if self.has_boundary_data:
d_min = np.finfo(float).max
patches_with_dist = list()
for subobst in self._subobstructions.values():
for patch in subobst.get_data(self.quantities[0])._patches:
dx = max(patch.extent.x_start - x, 0, x - patch.extent.x_end) if x is not None else 0
dy = max(patch.extent.y_start - y, 0, y - patch.extent.y_end) if y is not None else 0
dz = max(patch.extent.z_start - z, 0, z - patch.extent.z_end) if z is not None else 0
d = np.sqrt(dx * dx + dy * dy + dz * dz)
if d < d_min:
d_min = d
patches_with_dist = [(patch, dx, dy, dz)]
elif math.isclose(d, d_min, rel_tol=1e-9, abs_tol=0.0):
patches_with_dist.append((patch, dx, dy, dz))
patches_min = [p[0] for p in patches_with_dist]
if x is not None and len(patches_with_dist) > 1:
patches_with_dist.sort(key=lambda p: p[1])
patches_min = [p[0] for p in patches_with_dist]
elif y is not None and len(patches_with_dist) > 1:
patches_with_dist.sort(key=lambda p: p[2])
patches_min = [p[0] for p in patches_with_dist]
elif z is not None and len(patches_with_dist) > 1:
patches_with_dist.sort(key=lambda p: p[3])
patches_min = [p[0] for p in patches_with_dist]
if len(patches_min) > 0:
return patches_min[0]
return None
@property
def has_boundary_data(self):
"""Whether boundary data has been output in the simulation."""
return any(subobst.has_boundary_data for subobst in self._subobstructions.values())
[docs]
def get_global_boundary_data_arrays(
self, quantity: Union[str, Quantity]
) -> Dict[int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]:
"""Creates a global numpy ndarray from all subobstruction's boundary data for each orientation.
:param quantity: The quantity's name or short name for which to load the global arrays.
"""
if not self.has_boundary_data:
return dict()
for subobst in self._subobstructions.values():
if subobst.has_boundary_data:
cell_centered = next(iter(subobst._boundary_data.values())).cell_centered
result = dict()
for orientation_int in self.orientations:
subobst_sets = [list(), list()]
dim = ["x", "y", "z"][abs(orientation_int) - 1]
random_subobst = next(
subobst for subobst in self._subobstructions.values() if orientation_int in subobst.get_coordinates()
)
base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[orientation_int][dim][0]
for subobst in self._subobstructions.values():
if not subobst.has_boundary_data:
continue
coords = subobst.get_coordinates(ignore_cell_centered=False)
if orientation_int not in coords:
continue
if coords[orientation_int][dim][0] == base_coord:
subobst_sets[0].append(subobst)
else:
subobst_sets[1].append(subobst)
if len(subobst_sets[1]) == 0:
subobst_sets.pop(1)
first_result_grid = None
for subobsts in subobst_sets:
coord_min = {"x": math.inf, "y": math.inf, "z": math.inf}
coord_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf}
for dim in ("x", "y", "z"):
for subobst in subobsts:
patch_extent = subobst.get_data(quantity).data[orientation_int].extent
co = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int][dim]
co = co[
np.where(
np.logical_and(
co >= patch_extent[dim][0],
co <= patch_extent[dim][1],
)
)
]
coord_min[dim] = min(co[0], coord_min[dim])
coord_max[dim] = max(co[-1], coord_max[dim])
# The global grid will use the finest mesh as base and duplicate values of the coarser meshes
# Therefore we first find the finest mesh and calculate the step size in each dimension
step_sizes_min = {
"x": coord_max["x"] - coord_min["x"],
"y": coord_max["y"] - coord_min["y"],
"z": coord_max["z"] - coord_min["z"],
}
step_sizes_max = {"x": 0, "y": 0, "z": 0}
steps = dict()
global_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf}
for dim in ("x", "y", "z"):
for subobst in subobsts:
subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int]
if len(subobst_coords[dim]) <= 1:
step_size = 0
else:
step_size = subobst_coords[dim][1] - subobst_coords[dim][0]
step_sizes_min[dim] = min(step_size, step_sizes_min[dim])
step_sizes_max[dim] = max(step_size, step_sizes_max[dim])
global_max[dim] = max(subobst_coords[dim][-1], global_max[dim])
for dim in ("x", "y", "z"):
if step_sizes_min[dim] == 0:
step_sizes_min[dim] = math.inf
steps[dim] = 1
else:
steps[dim] = max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + (
0 if cell_centered else 1
)
grid = np.full((self.n_t(count_duplicates=False), steps["x"], steps["y"], steps["z"]), np.nan)
start_coordinates = {"x": coord_min["x"], "y": coord_min["y"], "z": coord_min["z"]}
start_idx = dict()
end_idx = dict()
for subobst in subobsts:
patch_data = np.expand_dims(
subobst.get_data(quantity).data[orientation_int].data, axis=abs(orientation_int)
)
subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int]
for axis in (0, 1, 2):
dim = ("x", "y", "z")[axis]
if axis == abs(orientation_int) - 1:
start_idx[dim] = 0
end_idx[dim] = 1
continue
n_repeat = max(
int(round((subobst_coords[dim][1] - subobst_coords[dim][0]) / step_sizes_min[dim])), 1
)
start_idx[dim] = int(
round((subobst_coords[dim][0] - start_coordinates[dim]) / step_sizes_min[dim])
)
end_idx[dim] = int(
round((subobst_coords[dim][-1] - start_coordinates[dim]) / step_sizes_min[dim])
)
# We ignore border points unless they are actually on the border of the simulation space as all
# other border points actually appear twice, as the subobstructions overlap. This only
# applies for face_centered data, as cell_centered data will not overlap.
if not cell_centered:
if axis != abs(orientation_int) - 1:
reduced_shape = list(patch_data.shape)
reduced_shape[axis + 1] -= 1
reduced_data_slices = tuple(slice(s) for s in reduced_shape)
patch_data = patch_data[reduced_data_slices]
# Temporarily save border points to add them back to the array again later
if subobst_coords[dim][-1] == global_max[dim]:
end_idx[dim] += 1
temp_data_slices = [slice(s) for s in patch_data.shape]
temp_data_slices[axis + 1] = slice(patch_data.shape[axis + 1] - 1, None)
temp_data = patch_data[tuple(temp_data_slices)]
if n_repeat > 1:
patch_data = np.repeat(patch_data, n_repeat, axis=axis + 1)
# Add border points back again if needed
if not cell_centered and subobst_coords[dim][-1] == global_max[dim]:
patch_data = np.concatenate((patch_data, temp_data), axis=axis + 1)
grid[
:, start_idx["x"] : end_idx["x"], start_idx["y"] : end_idx["y"], start_idx["z"] : end_idx["z"]
] = patch_data.reshape(
(
self.n_t(count_duplicates=False),
end_idx["x"] - start_idx["x"],
end_idx["y"] - start_idx["y"],
end_idx["z"] - start_idx["z"],
)
)
# Remove empty dimensions, but make sure to note remove the time dimension if there is only a single
# timestep
grid = np.squeeze(grid)
if len(grid.shape) == 2:
grid = grid[np.newaxis, :, :]
if first_result_grid is not None:
result[orientation_int] = (first_result_grid, grid)
else:
result[orientation_int] = grid
return result
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory."""
for s in self._subobstructions.values():
s.clear_cache()
[docs]
def vmin(
self,
quantity: Union[str, Quantity],
orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0,
) -> float:
"""Minimum value of all patches at any time for a specific quantity.
:param orientation: Optionally filter by patches with a specific orientation.
"""
if self.has_boundary_data:
return min(s.vmin(quantity, orientation) for s in self._subobstructions.values())
else:
return np.nan
[docs]
def vmax(
self,
quantity: Union[str, Quantity],
orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0,
) -> float:
"""Maximum value of all patches at any time for a specific quantity.
:param orientation: Optionally filter by patches with a specific orientation.
"""
if self.has_boundary_data:
return max(s.vmax(quantity, orientation) for s in self._subobstructions.values())
return np.nan
def __getitem__(self, key: Union[int, str, "Mesh"]):
"""Gets either the nth :class:`SubObstruction` or the one with the given mesh-id."""
if isinstance(key, int):
return tuple(self._subobstructions.values())[key]
elif isinstance(key, str):
return self._subobstructions[key]
return self._subobstructions[key.id]
def __eq__(self, other):
return self.id == other.id
def __repr__(self, *args, **kwargs):
return (
f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, "
f"SubObstructions={len(self._subobstructions)}"
+ (f", Quantities={[q.short_name for q in self.quantities]}" if self.has_boundary_data else "")
+ ")"
)