Source code for fdsreader.bndf.obstruction

import math
import os

# Unfortunately, this is necessary due to a cyclic reference. "Mesh" is only needed for static type hints anyway
from typing import TYPE_CHECKING, Dict, List, Sequence, Tuple, Union

import numpy as np
from typing_extensions import Literal

import fdsreader.utils.fortran_data as fdtype
from fdsreader import settings
from fdsreader.utils import Dimension, Extent, Quantity

if TYPE_CHECKING:
    from fdsreader.fds_classes import Mesh


[docs] class Patch: """Container for the actual data which is stored as rectangular plane with specific orientation and extent. :ivar dimension: :class:`Dimension` object containing information about steps in each dimension. :ivar extent: :class:`Extent` object containing 3-dimensional extent information. :ivar orientation: The direction the patch is facing (x={-1;1}, y={-2;2}, z={-3;3}). :ivar cell_centered: Indicates whether centered positioning for data is used. :ivar _n_t: Total number of time steps for which output data has been written. """ def __init__( self, file_path: str, dimension: Dimension, extent: Extent, orientation: int, cell_centered: bool, patch_offset: int, initial_offset: int, n_t: int, mesh: "Mesh", ): self.file_path = file_path self.dimension = dimension self.extent = extent self.orientation = orientation self.cell_centered = cell_centered self._patch_offset = patch_offset self._initial_offset = initial_offset self._time_offset = -1 self._n_t = n_t self.mesh = mesh self._boundary_parent: Boundary = None
[docs] def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ if count_duplicates: return self._n_t return np.unique(self._boundary_parent.times).size
@property def shape(self) -> Tuple: """Convenience function to calculate the shape of the array containing data for this patch.""" return self.dimension.shape(self.cell_centered) @property def size(self) -> int: """Convenience function to calculate the number of data points in the array for this patch.""" return self.dimension.size(self.cell_centered) def _post_init(self, time_offset: int): """Fully initialize the patch as soon as the number of timesteps is known.""" self._time_offset = time_offset
[docs] def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[Literal["x", "y", "z"], np.ndarray]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ # orientation = ('x', 'y', 'z')[self.orientation - 1] if self.orientation != 0 else '' # coords = {'x': set(), 'y': set(), 'z': set()} coords: Dict[Literal["x", "y", "z"], np.ndarray] = {} for dim in ("x", "y", "z"): co = self.mesh.coordinates[dim].copy() # In case the slice is cell-centered, we will shift the coordinates by half a cell # and remove the last coordinate if self.cell_centered and not ignore_cell_centered: co = co[:-1] co += abs(co[1] - co[0]) / 2 coords[dim] = co[np.where(np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]))] if coords[dim].size == 0: coords[dim] = np.array([co[np.argmin(np.abs(co - self.extent[dim][0]))]]) return coords
@property def data(self): """Method to load the quantity data for a single patch.""" if not hasattr(self, "_data"): dtype_data = fdtype.new((("f", self.dimension.size(cell_centered=False)),)) if self._n_t == -1: self._n_t = (os.stat(self.file_path).st_size - self._initial_offset) // self._time_offset self._data = np.empty((self.n_t(count_duplicates=True),) + self.shape) with open(self.file_path, "rb") as infile: for t in range(self.n_t(count_duplicates=True)): infile.seek(self._initial_offset + self._patch_offset + t * self._time_offset) data = np.fromfile(infile, dtype_data, 1)[0][1].reshape( self.dimension.shape(cell_centered=False), order="F" ) if self.cell_centered: self._data[t, :] = data[:-1, :-1] else: self._data[t, :] = data unique_times_indices = np.unique(self._boundary_parent.times, return_index=True)[1] self._data = self._data[unique_times_indices] return self._data
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory.""" if hasattr(self, "_data"): del self._data
def __repr__(self, *args, **kwargs): return f"Patch(shape={self.shape}, orientation={self.orientation}, extent={self.extent})"
[docs] class Boundary: """Container for boundary data specific to one quantity. :ivar quantity: Quantity object containing information about the quantity calculated for this :class:`Obstruction` with the corresponding short_name and unit. :ivar times: Numpy array containing all times for which data has been recorded. :ivar cell_centered: Indicates whether centered positioning for data is used. :ivar lower_bounds: Dictionary with lower bounds for each timestep with meshes as keys. :ivar upper_bounds: Dictionary with upper bounds for each timestep with meshes as keys. :ivar n_t: Total number of time steps for which output data has been written. """ def __init__( self, quantity: Quantity, cell_centered: bool, times: Sequence[float], patches: List[Patch], lower_bounds: np.ndarray, upper_bounds: np.ndarray, ): self.quantity = quantity self.cell_centered = cell_centered self._patches = patches self.times = times self.lower_bounds = lower_bounds self.upper_bounds = upper_bounds
[docs] def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ return self._patches[0].n_t(count_duplicates=count_duplicates)
@property def orientations(self): """Return all orientations for which there is data available.""" return [p.orientation for p in self._patches]
[docs] def get_nearest_timestep(self, time: float) -> int: """Calculates the nearest timestep for which data has been output for this obstruction.""" idx = np.searchsorted(self.times, time, side="left") if time > 0 and ( idx == len(self.times) or math.fabs(time - self.times[idx - 1]) < math.fabs(time - self.times[idx]) ): return idx - 1 else: return idx
@property def data(self) -> Dict[int, Patch]: """The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y).""" return {p.orientation: p for p in self._patches}
[docs] def vmin(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Minimum value of all patches at any time. :param orientation: Optionally filter by patches with a specific orientation. """ if orientation == 0: curr_min = np.min(self.lower_bounds) if curr_min == np.float32(+1e33): return float(min(np.min(p.data) for p in self._patches)) return float(curr_min) else: return float(np.min(self.data[orientation].data))
[docs] def vmax(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Maximum value of all patches at any time. :param orientation: Optionally filter by patches with a specific orientation. """ if orientation == 0: curr_max = np.max(self.upper_bounds) if curr_max == np.float32(-1e33): return float(max(np.max(p.data) for p in self._patches)) return float(curr_max) else: return float(np.max(self.data[orientation].data))
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory.""" for p in self._patches: p.clear_cache()
def __repr__(self): return f"Boundary(Quantity={self.quantity}, Patches={len(self._patches)})"
[docs] class SubObstruction: """An :class:`Obstruction` consists of 1 or more SubObstructions which can be hidden at specific points in time. :ivar extent: :class:`Extent` object containing 3-dimensional extent information. :ivar bound_indices: Indices used to define obstruction bounds in terms of mesh locations. :ivar side_surfaces: Tuple of six :class:`Surface` s for each side of the cuboid. :ivar hide_times: List with points in time from when on the SubObstruction will be hidden. :ivar show_times: List with points in time from when on the SubObstruction will be shown. """ def __init__( self, side_surfaces: Tuple, bound_indices: Tuple[int, int, int, int, int, int], extent: Extent, mesh: "Mesh" ): self.extent = extent self.side_surfaces = side_surfaces self.bound_indices = { "x": (bound_indices[0], bound_indices[1]), "y": (bound_indices[2], bound_indices[3]), "z": (bound_indices[4], bound_indices[5]), } self.mesh = mesh self._boundary_data: Dict[int, Boundary] = dict() self.hide_times = list() self.show_times = list() def _add_patches( self, bid: int, cell_centered: bool, quantity: str, short_name: str, unit: str, patches: List[Patch], times: Sequence[float], lower_bounds: np.ndarray, upper_bounds: np.ndarray, ): if bid not in self._boundary_data: self._boundary_data[bid] = Boundary( Quantity(quantity, short_name, unit), cell_centered, times, patches, lower_bounds, upper_bounds ) # Add reference to parent boundary class in patches for patch in patches: patch._boundary_parent = self._boundary_data[bid] if not settings.LAZY_LOAD: _ = self._boundary_data[bid].data @property def orientations(self): """Return all orientations for which there is data available.""" if self.has_boundary_data: return next(iter(self._boundary_data.values())).orientations return []
[docs] def get_coordinates( self, ignore_cell_centered: bool = False ) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: return { orientation: patch.get_coordinates(ignore_cell_centered) for orientation, patch in next(iter(self._boundary_data.values())).data.items() } return {}
[docs] def get_nearest_index(self, dimension: Literal["x", "y", "z"], orientation: int, value: float) -> int: """Get the nearest mesh coordinate index in a specific dimension for a specific orientation.""" if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx])): return idx - 1 else: return idx return np.nan
@property def has_boundary_data(self): return len(self._boundary_data) != 0
[docs] def get_data(self, quantity: Union[str, Quantity]): if isinstance(quantity, Quantity): quantity = quantity.name return next( b for b in self._boundary_data.values() if b.quantity.name.lower() == quantity.lower() or b.quantity.short_name.lower() == quantity.lower() )
def __getitem__(self, item): if isinstance(item, int): return self._boundary_data[item] return self.get_data(item) def _hide(self, time: float): self.hide_times.append(time) self.hide_times.sort() def _show(self, time: float): self.show_times.append(time) self.show_times.sort()
[docs] def n_t(self, count_duplicates=True) -> int: """Returns the number of timesteps for which boundary data is available. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ if self.has_boundary_data: return next(iter(self._boundary_data.values())).n_t(count_duplicates=count_duplicates) return 0
@property def times(self): """Return all timesteps for which boundary data is available, if any.""" if self.has_boundary_data: return next(iter(self._boundary_data.values())).times return np.array([])
[docs] def get_visible_times(self, times: Sequence[float]) -> np.ndarray: """Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden.""" ret = list() hidden = False for time in times: if time in self.show_times: hidden = False if time in self.hide_times: hidden = True if not hidden: ret.append(time) return np.array(ret)
[docs] def vmin( self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, ) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return self.get_data(quantity).vmin(orientation) return np.nan
[docs] def vmax( self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, ) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return self.get_data(quantity).vmax(orientation) return np.nan
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory.""" for bndf in self._boundary_data.values(): bndf.clear_cache()
def __repr__(self): return f"SubObstruction(Extent={self.extent})"
[docs] class Obstruction: """A box-shaped obstruction with specific surfaces (materials) on each side. :ivar id: ID of the obstruction. :ivar color_index: Type of coloring used to color obstruction. -1 - default color -2 - invisible -3 - use red, green, blue and alpha (rgba attribute) n>0 - use n’th color table entry :ivar block_type: Defines how the obstruction is drawn. -1 - use surface to obtain blocktype 0 - regular block 2 - outline :ivar texture_origin: Origin position of the texture provided by the surface. When the texture does have a pattern, for example windows or bricks, the texture_origin specifies where the pattern should begin. :ivar rgba: Optional color of the obstruction in form of a 4-element tuple (ranging from 0.0 to 1.0). """ def __init__( self, oid: str, color_index: int, block_type: int, texture_origin: Tuple[float, float, float], rgba: Union[Tuple[()], Tuple[float, float, float, float]] = (), ): self.id = oid self.color_index = color_index self.block_type = block_type self.texture_origin = texture_origin if len(rgba) != 0: self.rgba = rgba self._subobstructions: Dict[str, SubObstruction] = dict() @property def bounding_box(self) -> Extent: """:class:`Extent` object representing the bounding box around the Obstruction.""" extents = [sub.extent for sub in self._subobstructions.values()] return Extent( min(extents, key=lambda e: e.x_start).x_start, max(extents, key=lambda e: e.x_end).x_end, min(extents, key=lambda e: e.y_start).y_start, max(extents, key=lambda e: e.y_end).y_end, min(extents, key=lambda e: e.z_start).z_start, max(extents, key=lambda e: e.z_end).z_end, ) @property def orientations(self): """Return all orientations for which there is data available.""" if self.has_boundary_data: orientations = set() for subobst in self._subobstructions.values(): orientations.update(subobst.orientations) return sorted(list(orientations)) return []
[docs] def n_t(self, count_duplicates=True) -> int: """Returns the number of timesteps for which boundary data is available. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ for subobst in self._subobstructions.values(): if subobst.has_boundary_data: return subobst.n_t(count_duplicates=count_duplicates) return 0
@property def times(self): """Return all timesteps for which boundary data is available, if any.""" for subobst in self._subobstructions.values(): if subobst.has_boundary_data: return subobst.times return np.array([])
[docs] def get_visible_times(self, times: Sequence[float]): """Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden.""" for subobst in self._subobstructions.values(): return subobst.get_visible_times(times) return np.array([])
[docs] def get_coordinates( self, ignore_cell_centered: bool = False ) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: all_coords = dict() for orientation_int in self.orientations: orientation = ("x", "y", "z")[abs(orientation_int) - 1] coords = {"x": set(), "y": set(), "z": set()} for dim in ("x", "y", "z"): if orientation == dim: bounding_box_index = 0 if orientation_int < 0 else 1 coords[dim] = np.array([self.bounding_box[dim][bounding_box_index]]) continue min_delta = math.inf for subobst in self._subobstructions.values(): subobst_coords = subobst.get_coordinates(ignore_cell_centered) if orientation_int not in subobst_coords: continue co = subobst_coords[orientation_int][dim] min_delta = min(min_delta, co[1] - co[0]) coords[dim].update(co) coords[dim] = np.array(sorted(list(coords[dim]))) to_keep = np.full(coords[dim].shape, True) for i in range(len(coords[dim]) - 1): if abs(coords[dim][i] - coords[dim][i + 1]) < min_delta / 2: to_keep[i + 1] = False coords[dim] = coords[dim][to_keep] if len(coords[dim]) == 0: bounding_box_index = 0 if orientation_int < 0 else 1 single_coordinate = self.bounding_box[dim][bounding_box_index] nearest_coordinate = np.inf for subobst in self._subobstructions.values(): mesh = subobst.mesh mesh_coords = mesh.coordinates[dim] idx = np.searchsorted(mesh_coords, single_coordinate, side="left") if idx > 0 and ( idx == mesh_coords.size or math.fabs(single_coordinate - mesh_coords[idx - 1]) < math.fabs(single_coordinate - mesh_coords[idx]) ): idx = idx + 1 if mesh_coords[idx] - single_coordinate < nearest_coordinate - single_coordinate: nearest_coordinate = mesh_coords[idx] coords[dim] = np.array([nearest_coordinate]) all_coords[orientation_int] = coords return all_coords return dict()
[docs] def get_nearest_index(self, dimension: Literal["x", "y", "z"], orientation: int, value: float) -> int: """Get the nearest mesh coordinate index in a specific dimension for a specific orientation.""" if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx])): return idx - 1 else: return idx return np.nan
@property def quantities(self) -> List[Quantity]: """Get a list of all quantities for which boundary data exists.""" if self.has_boundary_data: qs = set() for subobst in self._subobstructions.values(): for b in subobst._boundary_data.values(): qs.add(b.quantity) return list(qs) return [] @property def meshes(self) -> List["Mesh"]: """Returns a list of all meshes this slice cuts through.""" return [subobst.mesh for subobst in self._subobstructions.values()]
[docs] def filter_by_orientation(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> List[SubObstruction]: """Filter all SubObstructions by a specific orientation. All returned SubObstructions will contain boundary data in the specified orientation. """ if self.has_boundary_data: return [subobst for subobst in self._subobstructions.values() if orientation in subobst.orientations] return []
[docs] def get_boundary_data( self, quantity: Union[Quantity, str], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0 ) -> Dict[str, Boundary]: """Gets the boundary data for a specific quantity of all SubObstructions. :param quantity: The quantity to filter by. :param orientation: Optionally filter by a specific orientation as well (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=z). A value of 0 indicates to no filter. """ if isinstance(quantity, Quantity): quantity = quantity.name ret = { subobst.mesh.id: subobst.get_data(quantity) for subobst in self._subobstructions.values() if subobst.has_boundary_data } if orientation == 0: return ret return {mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys()}
[docs] def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int: """Calculates the nearest timestep for which data has been output for this obstruction.""" if self.has_boundary_data: times = self.get_visible_times(self.times) if visible_only else self.times idx = np.searchsorted(times, time, side="left") if time > 0 and (idx == len(times) or math.fabs(time - times[idx - 1]) < math.fabs(time - times[idx])): return idx - 1 else: return idx return np.nan
[docs] def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): """Gets the patch of the :class:`SubObstruction` that has the least distance to the given point. If there are multiple patches with the same distance, a random one will be selected. """ if self.has_boundary_data: d_min = np.finfo(float).max patches_with_dist = list() for subobst in self._subobstructions.values(): for patch in subobst.get_data(self.quantities[0])._patches: dx = max(patch.extent.x_start - x, 0, x - patch.extent.x_end) if x is not None else 0 dy = max(patch.extent.y_start - y, 0, y - patch.extent.y_end) if y is not None else 0 dz = max(patch.extent.z_start - z, 0, z - patch.extent.z_end) if z is not None else 0 d = np.sqrt(dx * dx + dy * dy + dz * dz) if d < d_min: d_min = d patches_with_dist = [(patch, dx, dy, dz)] elif math.isclose(d, d_min, rel_tol=1e-9, abs_tol=0.0): patches_with_dist.append((patch, dx, dy, dz)) patches_min = [p[0] for p in patches_with_dist] if x is not None and len(patches_with_dist) > 1: patches_with_dist.sort(key=lambda p: p[1]) patches_min = [p[0] for p in patches_with_dist] elif y is not None and len(patches_with_dist) > 1: patches_with_dist.sort(key=lambda p: p[2]) patches_min = [p[0] for p in patches_with_dist] elif z is not None and len(patches_with_dist) > 1: patches_with_dist.sort(key=lambda p: p[3]) patches_min = [p[0] for p in patches_with_dist] if len(patches_min) > 0: return patches_min[0] return None
@property def has_boundary_data(self): """Whether boundary data has been output in the simulation.""" return any(subobst.has_boundary_data for subobst in self._subobstructions.values())
[docs] def get_global_boundary_data_arrays( self, quantity: Union[str, Quantity] ) -> Dict[int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: """Creates a global numpy ndarray from all subobstruction's boundary data for each orientation. :param quantity: The quantity's name or short name for which to load the global arrays. """ if not self.has_boundary_data: return dict() for subobst in self._subobstructions.values(): if subobst.has_boundary_data: cell_centered = next(iter(subobst._boundary_data.values())).cell_centered result = dict() for orientation_int in self.orientations: subobst_sets = [list(), list()] dim = ["x", "y", "z"][abs(orientation_int) - 1] random_subobst = next( subobst for subobst in self._subobstructions.values() if orientation_int in subobst.get_coordinates() ) base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[orientation_int][dim][0] for subobst in self._subobstructions.values(): if not subobst.has_boundary_data: continue coords = subobst.get_coordinates(ignore_cell_centered=False) if orientation_int not in coords: continue if coords[orientation_int][dim][0] == base_coord: subobst_sets[0].append(subobst) else: subobst_sets[1].append(subobst) if len(subobst_sets[1]) == 0: subobst_sets.pop(1) first_result_grid = None for subobsts in subobst_sets: coord_min = {"x": math.inf, "y": math.inf, "z": math.inf} coord_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf} for dim in ("x", "y", "z"): for subobst in subobsts: patch_extent = subobst.get_data(quantity).data[orientation_int].extent co = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int][dim] co = co[ np.where( np.logical_and( co >= patch_extent[dim][0], co <= patch_extent[dim][1], ) ) ] coord_min[dim] = min(co[0], coord_min[dim]) coord_max[dim] = max(co[-1], coord_max[dim]) # The global grid will use the finest mesh as base and duplicate values of the coarser meshes # Therefore we first find the finest mesh and calculate the step size in each dimension step_sizes_min = { "x": coord_max["x"] - coord_min["x"], "y": coord_max["y"] - coord_min["y"], "z": coord_max["z"] - coord_min["z"], } step_sizes_max = {"x": 0, "y": 0, "z": 0} steps = dict() global_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf} for dim in ("x", "y", "z"): for subobst in subobsts: subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] if len(subobst_coords[dim]) <= 1: step_size = 0 else: step_size = subobst_coords[dim][1] - subobst_coords[dim][0] step_sizes_min[dim] = min(step_size, step_sizes_min[dim]) step_sizes_max[dim] = max(step_size, step_sizes_max[dim]) global_max[dim] = max(subobst_coords[dim][-1], global_max[dim]) for dim in ("x", "y", "z"): if step_sizes_min[dim] == 0: step_sizes_min[dim] = math.inf steps[dim] = 1 else: steps[dim] = max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + ( 0 if cell_centered else 1 ) grid = np.full((self.n_t(count_duplicates=False), steps["x"], steps["y"], steps["z"]), np.nan) start_coordinates = {"x": coord_min["x"], "y": coord_min["y"], "z": coord_min["z"]} start_idx = dict() end_idx = dict() for subobst in subobsts: patch_data = np.expand_dims( subobst.get_data(quantity).data[orientation_int].data, axis=abs(orientation_int) ) subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] for axis in (0, 1, 2): dim = ("x", "y", "z")[axis] if axis == abs(orientation_int) - 1: start_idx[dim] = 0 end_idx[dim] = 1 continue n_repeat = max( int(round((subobst_coords[dim][1] - subobst_coords[dim][0]) / step_sizes_min[dim])), 1 ) start_idx[dim] = int( round((subobst_coords[dim][0] - start_coordinates[dim]) / step_sizes_min[dim]) ) end_idx[dim] = int( round((subobst_coords[dim][-1] - start_coordinates[dim]) / step_sizes_min[dim]) ) # We ignore border points unless they are actually on the border of the simulation space as all # other border points actually appear twice, as the subobstructions overlap. This only # applies for face_centered data, as cell_centered data will not overlap. if not cell_centered: if axis != abs(orientation_int) - 1: reduced_shape = list(patch_data.shape) reduced_shape[axis + 1] -= 1 reduced_data_slices = tuple(slice(s) for s in reduced_shape) patch_data = patch_data[reduced_data_slices] # Temporarily save border points to add them back to the array again later if subobst_coords[dim][-1] == global_max[dim]: end_idx[dim] += 1 temp_data_slices = [slice(s) for s in patch_data.shape] temp_data_slices[axis + 1] = slice(patch_data.shape[axis + 1] - 1, None) temp_data = patch_data[tuple(temp_data_slices)] if n_repeat > 1: patch_data = np.repeat(patch_data, n_repeat, axis=axis + 1) # Add border points back again if needed if not cell_centered and subobst_coords[dim][-1] == global_max[dim]: patch_data = np.concatenate((patch_data, temp_data), axis=axis + 1) grid[ :, start_idx["x"] : end_idx["x"], start_idx["y"] : end_idx["y"], start_idx["z"] : end_idx["z"] ] = patch_data.reshape( ( self.n_t(count_duplicates=False), end_idx["x"] - start_idx["x"], end_idx["y"] - start_idx["y"], end_idx["z"] - start_idx["z"], ) ) # Remove empty dimensions, but make sure to note remove the time dimension if there is only a single # timestep grid = np.squeeze(grid) if len(grid.shape) == 2: grid = grid[np.newaxis, :, :] if first_result_grid is not None: result[orientation_int] = (first_result_grid, grid) else: result[orientation_int] = grid return result
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory.""" for s in self._subobstructions.values(): s.clear_cache()
[docs] def vmin( self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, ) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return min(s.vmin(quantity, orientation) for s in self._subobstructions.values()) else: return np.nan
[docs] def vmax( self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, ) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return max(s.vmax(quantity, orientation) for s in self._subobstructions.values()) return np.nan
def __getitem__(self, key: Union[int, str, "Mesh"]): """Gets either the nth :class:`SubObstruction` or the one with the given mesh-id.""" if isinstance(key, int): return tuple(self._subobstructions.values())[key] elif isinstance(key, str): return self._subobstructions[key] return self._subobstructions[key.id] def __eq__(self, other): return self.id == other.id def __repr__(self, *args, **kwargs): return ( f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, " f"SubObstructions={len(self._subobstructions)}" + (f", Quantities={[q.short_name for q in self.quantities]}" if self.has_boundary_data else "") + ")" )