Module quagmire.mesh.trimesh
For unstructured data in Cartesian coordinates.

TriMesh implements the following functionality:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Supply a PETSc DM object (created from quagmire.tools.meshtools) to initialise the object.
Expand source code
# Copyright 2016-2020 Louis Moresi, Ben Mather, Romain Beucher
#
# This file is part of Quagmire.
#
# Quagmire is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or any later version.
#
# Quagmire is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Quagmire. If not, see <http://www.gnu.org/licenses/>.
"""
For unstructured data in Cartesian coordinates.
<img src="https://raw.githubusercontent.com/underworldcode/quagmire/dev/docs/images/quagmire-flowchart-trimesh.png" style="width: 321px; float:right">
`TriMesh` implements the following functionality:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Supply a `PETSc DM` object (created from `quagmire.tools.meshtools`) to initialise the object.
"""
import numpy as np
from mpi4py import MPI
import sys,petsc4py
petsc4py.init(sys.argv)
from petsc4py import PETSc
# comm = MPI.COMM_WORLD
from time import perf_counter
from .commonmesh import CommonMesh as _CommonMesh
try: range = xrange
except: pass
from quagmire import function as fn
from quagmire.function import LazyEvaluation as _LazyEvaluation
class TriMesh(_CommonMesh):
"""
Build spatial data structures on an __unstructured Cartesian mesh__.
Use `TriMesh` for:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Each of these data structures are built on top of a `PETSc DM` object
(created from `quagmire.tools.meshtools`).
Parameters
----------
DM : PETSc DMPlex object
Build this unstructured Cartesian mesh object using one of:
- `quagmire.tools.meshtools.create_DMPlex`
- `quagmire.tools.meshtools.create_DMPlex_from_points`
- `quagmire.tools.meshtools.create_DMPlex_from_box`
verbose : bool
Flag toggles verbose output
*args : optional arguments
**kwargs : optional keyword arguments
Attributes
----------
tri : stripy Triangulation object
Cartesian mesh object generated by `stripy`
npoints : int
Number of points (n) in the mesh
pointwise_area : Quagmire MeshVariable
`quagmire.mesh.basemesh.MeshVariable` of point-wise area
mask : Quagmire MeshVariable
`quagmire.mesh.basemesh.MeshVariable` to denote points on the boundary
data : array of floats, shape (n,2)
Cartesian mesh coordinates in x,y directions
coords : array of floats, shape (n,2)
Same as `data`
neighbour_cloud : array of ints, shape(n,25)
array of nearest node neighbours by distance
near_neighbour_mask : array of bools, shape(n,25)
mask immediate node neighbours in `neighbour_cloud` (They may not be there !! )
timings : dict
Timing information for each Quagmire routine
"""
## This is a count of all the instances of the class that are launched
## so that we have a way to name / identify them
__count = 0
@classmethod
def _count(cls):
TriMesh.__count += 1
return TriMesh.__count
@property
def id(self):
return self.__id
def __init__(self, dm, verbose=True, *args, **kwargs):
import stripy
from scipy.spatial import cKDTree as _cKDTree
# initialise base mesh class
super(TriMesh, self).__init__(dm, verbose)
self.__id = "trimesh_{}".format(self._count())
# Delaunay triangulation
t = perf_counter()
coords = self.dm.getCoordinatesLocal().array.reshape(-1,2)
# minX, minY = coords.min(axis=0)
# maxX, maxY = coords.max(axis=0)
# length_scale = np.sqrt((maxX - minX)*(maxY - minY)/coords.shape[0])
# # coords += np.random.random(coords.shape) * 0.0001 * length_scale # This should be aware of the point spacing (small perturbation)
self.tri = stripy.Triangulation(coords[:,0], coords[:,1], permute=True)
self.npoints = self.tri.npoints
self.timings['triangulation'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print(("{} - Delaunay triangulation {}s".format(self.dm.comm.rank, perf_counter()-t)))
# Calculate weigths and pointwise area
t = perf_counter()
self.area, self.weight = self.calculate_area_weights()
self.pointwise_area = self.add_variable(name="area")
self.pointwise_area.data = self.area
self.pointwise_area.lock()
self.timings['area weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print(("{} - Calculate node weights and area {}s".format(self.dm.comm.rank, perf_counter()-t)))
# Find boundary points
t = perf_counter()
self.bmask = self.get_boundary()
self.mask = self.add_variable(name="Mask")
self.mask.data = self.bmask.astype(PETSc.ScalarType)
self.mask.lock()
self.timings['find boundaries'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print(("{} - Find boundaries {}s".format(self.dm.comm.rank, perf_counter()-t)))
# cKDTree
t = perf_counter()
self.cKDTree = _cKDTree(self.tri.points, balanced_tree=False)
self.timings['cKDTree'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print(("{} - cKDTree {}s".format(self.dm.comm.rank, perf_counter()-t)))
# Find neighbours
t = perf_counter()
self.construct_neighbour_cloud()
self.timings['construct neighbour cloud'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
t1 = perf_counter()
self.construct_natural_neighbour_cloud()
self.timings['construct natural neighbour cloud'] = [perf_counter()-t1, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print("{} - Construct neighbour cloud arrays {}s, ({}s + {}s)".format(self.dm.comm.rank, perf_counter()-t,
self.timings['construct neighbour cloud'][0],
self.timings['construct natural neighbour cloud'][0] ))
# sync smoothing operator
t = perf_counter()
self._construct_rbf_weights()
self.timings['construct rbf weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print(("{} - Construct rbf weights {}s".format(self.dm.comm.rank, perf_counter()-t)))
self.root = False
self._coords = self.tri.points
self._data = self._coords
self.interpolate = self.tri.interpolate
# functions / parameters that are required for compatibility among FlatMesh types
self._derivative_grad_cartesian = self.derivative_grad
self._radius = 1.0
@property
def data(self):
"""
Cartesian coordinates of local mesh points.
`data` is of shape (npoints, 2)
Being a `TriMesh` object, `self.data` are identical to `self.coords`
"""
return self._data
@property
def coords(self):
"""
Cartesian coordinates of local mesh points
`coords` is of shape (npoints, 2)
Being a `TriMesh` object, `self.coords` are identical to `self.data`
"""
return self._coords
def calculate_area_weights(self):
"""
Calculate pointwise area and weigths
Returns
-------
area : array of floats, shape (n,)
point-wise area
weight : array of ints, shape (n,)
point-wise weight
Notes
-----
This calls a fortran 90 routine which computes the weight and area
for each point in the mesh.
"""
from quagmire._fortran import ntriw
area, weight = ntriw(self.tri.x, self.tri.y, self.tri.simplices.T+1)
return area, weight
def node_neighbours(self, point):
"""
Returns a list of neighbour nodes for a given point in the
Delaunay triangulation.
Parameters
----------
point : int
point on the mesh
Returns
-------
neighbours : list
list of neighbours that are connected by line segments
to the specified `point`
"""
return self.natural_neighbours[point,1:self.natural_neighbours_count[point]]
def derivatives(self, PHI, nit=10, tol=1e-8):
return self.derivative_grad(PHI, nit=10, tol=1e-8)
def derivative_grad(self, PHI, nit=10, tol=1e-8):
"""
Compute derivatives of PHI in the x, y directions.
This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
---------
PHI : ndarray of floats, shape (n,)
compute the derivative of this array
nit : int optional (default: 10)
number of iterations to reach convergence
tol : float optional (default: 1e-8)
convergence is reached when this tolerance is met
Returns
-------
PHIx : ndarray of floats, shape(n,)
first partial derivative of PHI in x direction
PHIy : ndarray of floats, shape(n,)
first partial derivative of PHI in y direction
"""
u_x, u_y = self.tri.gradient(PHI, nit, tol)
grads = np.ndarray((u_x.size, 2))
grads[:, 0] = u_x
grads[:, 1] = u_y
return grads
def derivative_div(self, PHIx, PHIy, **kwargs):
"""
Compute second order derivative from flux fields PHIx, PHIy
We evaluate the gradient on these fields using the `derivative_grad` method.
Arguments
---------
PHIx : ndarray of floats, shape (n,)
array of first partial derivatives in x direction
PHIy : ndarray of floats, shape (n,)
array of first partial derivatives in y direction
kwargs : optional keyword-argument specifiers
keyword arguments to be passed onto derivative_grad
e.g. nit=5, tol=1e-3
Returns
-------
del2PHI : ndarray of floats, shape (n,)
second derivative of PHI
"""
u_xx = self.derivative_grad(PHIx, **kwargs)[0]
u_yy = self.derivative_grad(PHIy, **kwargs)[1]
return u_xx + u_yy
def construct_natural_neighbour_cloud(self):
"""
Find the natural neighbours for each node in the triangulation and store in a
numpy array for efficient lookup.
"""
# # maximum size of nn array node-by-node (it's almost certainly +1 because
# # it is only +2 if the neighbours do not form a closed loop around the node)
# max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2
# s = self.tri.simplices
# nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int)
# nnm = np.zeros((self.npoints, max_neighbours), dtype=bool)
# nn = np.empty((self.npoints), dtype=int)
# for node in range(0, self.npoints):
# w = np.where(s==node)
# l = np.unique(s[w[0]])
# # l = np.roll(l,-np.argwhere(l == node)[0])
# nn[node] = l.shape[0]
# nns[node,0:nn[node]] = l
# nnm[node,0:nn[node]] = True
# self.natural_neighbours = nns
# self.natural_neighbours_count = nn
# self.natural_neighbours_mask = nnm
# Find all the segments in the triangulation and sort the array
# (note this differs from the stripy routine because is treates m-n and n-m as distinct)
lst = self.tri.lst
lend = self.tri.lend
lptr = self.tri.lptr
segments_array = np.empty((len(lptr),2),dtype=np.int)
segments_array[:,0] = np.abs(lst[:]) - 1
segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1
valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0)
segments = segments_array[valid,:]
deshuffled = self.tri._deshuffle_simplices(segments)
smsi = np.argsort(deshuffled[:,0])
sms = np.zeros_like(deshuffled)
sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:]
natural_neighbours_count = np.bincount(sms[:,0])
iend = np.cumsum(natural_neighbours_count)
istart = np.zeros_like(iend)
istart[1:] = iend[:-1]
natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int)
for j in range(0,self.npoints):
natural_neighbours[j, 0] = j
natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1]
natural_neighbours_count += 1
self.natural_neighbours = natural_neighbours
self.natural_neighbours_count = natural_neighbours_count
self.natural_neighbours_mask = natural_neighbours != -1
return
def construct_neighbour_cloud(self, size=25):
"""
Find neighbours from distance cKDTree.
Parameters
----------
size : int
Number of neighbours to search for
Notes
-----
Use this method to search for neighbours that are not
necessarily immediate node neighbours (i.e. neighbours
connected by a line segment). Extended node neighbours
should be captured by the search depending on how large
`size` is set to.
"""
nndist, nncloud = self.cKDTree.query(self.tri.points, k=size)
self.neighbour_cloud = nncloud
self.neighbour_cloud_distances = nndist
neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints)
self.near_neighbours = neighbours + 2
self.extended_neighbours = np.full_like(neighbours, size)
# self.near_neighbour_mask = np.zeros_like(nncloud, dtype=np.bool)
# for node in range(0,self.npoints):
# self.near_neighbour_mask[node, 0:self.near_neighbours[node]] = True
# row, col = np.nonzero(self.tri.simplices == node)
# natural_neighbours = list(np.unique(np.hstack(self.tri.simplices[row])))
# # move self node to start of the list
# natural_neighbours.remove(node)
# natural_neighbours.insert(0, node)
# # identify distance neighbours from cloud and join lists
# distance_neighbours = list(set(nncloud[node]) - set(natural_neighbours))
# node_neighbours = np.hstack([natural_neighbours, distance_neighbours])
# # size of node_nncloud will be >= ncols
# nncloud[node] = node_neighbours[0:size]
# ## Nothing is done about the nndist array, yet...
return
def local_area_smoothing(self, data, its=1, centre_weight=0.75):
"""
Local area smoothing using radial-basis function smoothing kernel
Parameters
----------
data : array of floats, shape (n,)
field variable to be smoothed
its : int
number of iterations (default: 1)
centre_weight : float
weight to apply to centre nodes (default: 0.75)
other nodes are weighted by (1 - `centre_weight`)
Returns
-------
sm : array of floats, shape (n,)
smoothed field variable
"""
smooth_data = data.copy()
smooth_data_old = data.copy()
for i in range(0, its):
smooth_data_old[:] = smooth_data
smooth_data = centre_weight*smooth_data_old + \
(1.0 - centre_weight)*self.rbf_smoother(smooth_data)
return smooth_data
def _construct_rbf_weights(self, delta=None):
if delta == None:
delta = self.neighbour_cloud_distances[:, 1].mean()
self.delta = delta
self.gaussian_dist_w = self._rbf_weights(delta)
return
def _rbf_weights(self, delta=None):
neighbour_cloud_distances = self.neighbour_cloud_distances
if delta == None:
delta = self.neighbour_cloud_distances[:, 1].mean()
# Initialise the interpolants
gaussian_dist_w = np.zeros_like(neighbour_cloud_distances)
gaussian_dist_w[:,:] = np.exp(-np.power(neighbour_cloud_distances[:,:]/delta, 2.0))
gaussian_dist_w[:,:] /= gaussian_dist_w.sum(axis=1).reshape(-1,1)
return gaussian_dist_w
def rbf_smoother(self, vector, iterations=1, delta=None):
"""
Smoothing using a radial-basis function smoothing kernel
Arguments
---------
vector : array of floats, shape (n,)
field variable to be smoothed
iterations : int
number of iterations to smooth vector
delta : float / array of floats shape (n,)
distance weights to apply the Gaussian interpolants
Returns
-------
smooth_vec : array of floats, shape (n,)
smoothed version of input vector
"""
if type(delta) != type(None):
self._construct_rbf_weights(delta)
vector = self.sync(vector)
for i in range(0, iterations):
# print self.dm.comm.rank, ": RBF ",vector.max(), vector.min()
vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1)
vector = self.sync(vector_smoothed)
return vector
def build_rbf_smoother(self, delta, iterations=1):
trimesh_self = self
class _rbf_smoother_object(object):
def __init__(inner_self, delta, iterations=1):
if delta == None:
trimesh_self.lvec.setArray(trimesh_self.neighbour_cloud_distances[:, 1])
trimesh_self.dm.localToGlobal(trimesh_self.lvec, trimesh_self.gvec)
delta = trimesh_self.gvec.sum() / trimesh_self.gvec.getSize()
inner_self._mesh = trimesh_self
inner_self.delta = delta
inner_self.iterations = iterations
inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta)
return
def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1):
import quagmire
smooth_node_values = lazyFn.evaluate(inner_self._mesh)
for i in range(0, iterations):
smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1)
smooth_node_values = inner_self._mesh.sync(smooth_node_values)
return smooth_node_values
def smooth_fn(inner_self, meshVar, iterations=None):
if iterations is None:
iterations = inner_self.iterations
def smoother_fn(*args, **kwargs):
import quagmire
smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations)
if len(args) == 1 and args[0] == meshVar._mesh:
return smooth_node_values
elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]):
mesh = args[0]
return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs)
else:
xi = np.atleast_1d(args[0])
yi = np.atleast_1d(args[1])
i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs)
return i
newLazyFn = _LazyEvaluation()
newLazyFn.evaluate = smoother_fn
newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations)
return newLazyFn
return _rbf_smoother_object(delta, iterations)
Classes
class TriMesh (dm, verbose=True, *args, **kwargs)-
Build spatial data structures on an unstructured Cartesian mesh.
Use
TriMeshfor:- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Each of these data structures are built on top of a
PETSc DMobject (created fromquagmire.tools.meshtools).Parameters
DM:PETSc DMPlex object-
Build this unstructured Cartesian mesh object using one of:
verbose:bool- Flag toggles verbose output
*args:optional arguments**kwargs:optional keyword arguments
Attributes
tri:stripy Triangulation object- Cartesian mesh object generated by
stripy npoints:int- Number of points (n) in the mesh
pointwise_area:Quagmire MeshVariableMeshVariableof point-wise areamask:Quagmire MeshVariableMeshVariableto denote points on the boundarydata:arrayoffloats, shape (n,2)- Cartesian mesh coordinates in x,y directions
coords:arrayoffloats, shape (n,2)- Same as
data neighbour_cloud:arrayofints, shape(n,25)- array of nearest node neighbours by distance
near_neighbour_mask:arrayofbools, shape(n,25)- mask immediate node neighbours in
neighbour_cloud(They may not be there !! ) timings:dict- Timing information for each Quagmire routine
Expand source code
class TriMesh(_CommonMesh): """ Build spatial data structures on an __unstructured Cartesian mesh__. Use `TriMesh` for: - calculating spatial derivatives - identifying node neighbour relationships - interpolation / extrapolation - smoothing operators - importing and saving mesh information Each of these data structures are built on top of a `PETSc DM` object (created from `quagmire.tools.meshtools`). Parameters ---------- DM : PETSc DMPlex object Build this unstructured Cartesian mesh object using one of: - `quagmire.tools.meshtools.create_DMPlex` - `quagmire.tools.meshtools.create_DMPlex_from_points` - `quagmire.tools.meshtools.create_DMPlex_from_box` verbose : bool Flag toggles verbose output *args : optional arguments **kwargs : optional keyword arguments Attributes ---------- tri : stripy Triangulation object Cartesian mesh object generated by `stripy` npoints : int Number of points (n) in the mesh pointwise_area : Quagmire MeshVariable `quagmire.mesh.basemesh.MeshVariable` of point-wise area mask : Quagmire MeshVariable `quagmire.mesh.basemesh.MeshVariable` to denote points on the boundary data : array of floats, shape (n,2) Cartesian mesh coordinates in x,y directions coords : array of floats, shape (n,2) Same as `data` neighbour_cloud : array of ints, shape(n,25) array of nearest node neighbours by distance near_neighbour_mask : array of bools, shape(n,25) mask immediate node neighbours in `neighbour_cloud` (They may not be there !! ) timings : dict Timing information for each Quagmire routine """ ## This is a count of all the instances of the class that are launched ## so that we have a way to name / identify them __count = 0 @classmethod def _count(cls): TriMesh.__count += 1 return TriMesh.__count @property def id(self): return self.__id def __init__(self, dm, verbose=True, *args, **kwargs): import stripy from scipy.spatial import cKDTree as _cKDTree # initialise base mesh class super(TriMesh, self).__init__(dm, verbose) self.__id = "trimesh_{}".format(self._count()) # Delaunay triangulation t = perf_counter() coords = self.dm.getCoordinatesLocal().array.reshape(-1,2) # minX, minY = coords.min(axis=0) # maxX, maxY = coords.max(axis=0) # length_scale = np.sqrt((maxX - minX)*(maxY - minY)/coords.shape[0]) # # coords += np.random.random(coords.shape) * 0.0001 * length_scale # This should be aware of the point spacing (small perturbation) self.tri = stripy.Triangulation(coords[:,0], coords[:,1], permute=True) self.npoints = self.tri.npoints self.timings['triangulation'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print(("{} - Delaunay triangulation {}s".format(self.dm.comm.rank, perf_counter()-t))) # Calculate weigths and pointwise area t = perf_counter() self.area, self.weight = self.calculate_area_weights() self.pointwise_area = self.add_variable(name="area") self.pointwise_area.data = self.area self.pointwise_area.lock() self.timings['area weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print(("{} - Calculate node weights and area {}s".format(self.dm.comm.rank, perf_counter()-t))) # Find boundary points t = perf_counter() self.bmask = self.get_boundary() self.mask = self.add_variable(name="Mask") self.mask.data = self.bmask.astype(PETSc.ScalarType) self.mask.lock() self.timings['find boundaries'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print(("{} - Find boundaries {}s".format(self.dm.comm.rank, perf_counter()-t))) # cKDTree t = perf_counter() self.cKDTree = _cKDTree(self.tri.points, balanced_tree=False) self.timings['cKDTree'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print(("{} - cKDTree {}s".format(self.dm.comm.rank, perf_counter()-t))) # Find neighbours t = perf_counter() self.construct_neighbour_cloud() self.timings['construct neighbour cloud'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] t1 = perf_counter() self.construct_natural_neighbour_cloud() self.timings['construct natural neighbour cloud'] = [perf_counter()-t1, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print("{} - Construct neighbour cloud arrays {}s, ({}s + {}s)".format(self.dm.comm.rank, perf_counter()-t, self.timings['construct neighbour cloud'][0], self.timings['construct natural neighbour cloud'][0] )) # sync smoothing operator t = perf_counter() self._construct_rbf_weights() self.timings['construct rbf weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print(("{} - Construct rbf weights {}s".format(self.dm.comm.rank, perf_counter()-t))) self.root = False self._coords = self.tri.points self._data = self._coords self.interpolate = self.tri.interpolate # functions / parameters that are required for compatibility among FlatMesh types self._derivative_grad_cartesian = self.derivative_grad self._radius = 1.0 @property def data(self): """ Cartesian coordinates of local mesh points. `data` is of shape (npoints, 2) Being a `TriMesh` object, `self.data` are identical to `self.coords` """ return self._data @property def coords(self): """ Cartesian coordinates of local mesh points `coords` is of shape (npoints, 2) Being a `TriMesh` object, `self.coords` are identical to `self.data` """ return self._coords def calculate_area_weights(self): """ Calculate pointwise area and weigths Returns ------- area : array of floats, shape (n,) point-wise area weight : array of ints, shape (n,) point-wise weight Notes ----- This calls a fortran 90 routine which computes the weight and area for each point in the mesh. """ from quagmire._fortran import ntriw area, weight = ntriw(self.tri.x, self.tri.y, self.tri.simplices.T+1) return area, weight def node_neighbours(self, point): """ Returns a list of neighbour nodes for a given point in the Delaunay triangulation. Parameters ---------- point : int point on the mesh Returns ------- neighbours : list list of neighbours that are connected by line segments to the specified `point` """ return self.natural_neighbours[point,1:self.natural_neighbours_count[point]] def derivatives(self, PHI, nit=10, tol=1e-8): return self.derivative_grad(PHI, nit=10, tol=1e-8) def derivative_grad(self, PHI, nit=10, tol=1e-8): """ Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ u_x, u_y = self.tri.gradient(PHI, nit, tol) grads = np.ndarray((u_x.size, 2)) grads[:, 0] = u_x grads[:, 1] = u_y return grads def derivative_div(self, PHIx, PHIy, **kwargs): """ Compute second order derivative from flux fields PHIx, PHIy We evaluate the gradient on these fields using the `derivative_grad` method. Arguments --------- PHIx : ndarray of floats, shape (n,) array of first partial derivatives in x direction PHIy : ndarray of floats, shape (n,) array of first partial derivatives in y direction kwargs : optional keyword-argument specifiers keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3 Returns ------- del2PHI : ndarray of floats, shape (n,) second derivative of PHI """ u_xx = self.derivative_grad(PHIx, **kwargs)[0] u_yy = self.derivative_grad(PHIy, **kwargs)[1] return u_xx + u_yy def construct_natural_neighbour_cloud(self): """ Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup. """ # # maximum size of nn array node-by-node (it's almost certainly +1 because # # it is only +2 if the neighbours do not form a closed loop around the node) # max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2 # s = self.tri.simplices # nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int) # nnm = np.zeros((self.npoints, max_neighbours), dtype=bool) # nn = np.empty((self.npoints), dtype=int) # for node in range(0, self.npoints): # w = np.where(s==node) # l = np.unique(s[w[0]]) # # l = np.roll(l,-np.argwhere(l == node)[0]) # nn[node] = l.shape[0] # nns[node,0:nn[node]] = l # nnm[node,0:nn[node]] = True # self.natural_neighbours = nns # self.natural_neighbours_count = nn # self.natural_neighbours_mask = nnm # Find all the segments in the triangulation and sort the array # (note this differs from the stripy routine because is treates m-n and n-m as distinct) lst = self.tri.lst lend = self.tri.lend lptr = self.tri.lptr segments_array = np.empty((len(lptr),2),dtype=np.int) segments_array[:,0] = np.abs(lst[:]) - 1 segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1 valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0) segments = segments_array[valid,:] deshuffled = self.tri._deshuffle_simplices(segments) smsi = np.argsort(deshuffled[:,0]) sms = np.zeros_like(deshuffled) sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:] natural_neighbours_count = np.bincount(sms[:,0]) iend = np.cumsum(natural_neighbours_count) istart = np.zeros_like(iend) istart[1:] = iend[:-1] natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int) for j in range(0,self.npoints): natural_neighbours[j, 0] = j natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1] natural_neighbours_count += 1 self.natural_neighbours = natural_neighbours self.natural_neighbours_count = natural_neighbours_count self.natural_neighbours_mask = natural_neighbours != -1 return def construct_neighbour_cloud(self, size=25): """ Find neighbours from distance cKDTree. Parameters ---------- size : int Number of neighbours to search for Notes ----- Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large `size` is set to. """ nndist, nncloud = self.cKDTree.query(self.tri.points, k=size) self.neighbour_cloud = nncloud self.neighbour_cloud_distances = nndist neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints) self.near_neighbours = neighbours + 2 self.extended_neighbours = np.full_like(neighbours, size) # self.near_neighbour_mask = np.zeros_like(nncloud, dtype=np.bool) # for node in range(0,self.npoints): # self.near_neighbour_mask[node, 0:self.near_neighbours[node]] = True # row, col = np.nonzero(self.tri.simplices == node) # natural_neighbours = list(np.unique(np.hstack(self.tri.simplices[row]))) # # move self node to start of the list # natural_neighbours.remove(node) # natural_neighbours.insert(0, node) # # identify distance neighbours from cloud and join lists # distance_neighbours = list(set(nncloud[node]) - set(natural_neighbours)) # node_neighbours = np.hstack([natural_neighbours, distance_neighbours]) # # size of node_nncloud will be >= ncols # nncloud[node] = node_neighbours[0:size] # ## Nothing is done about the nndist array, yet... return def local_area_smoothing(self, data, its=1, centre_weight=0.75): """ Local area smoothing using radial-basis function smoothing kernel Parameters ---------- data : array of floats, shape (n,) field variable to be smoothed its : int number of iterations (default: 1) centre_weight : float weight to apply to centre nodes (default: 0.75) other nodes are weighted by (1 - `centre_weight`) Returns ------- sm : array of floats, shape (n,) smoothed field variable """ smooth_data = data.copy() smooth_data_old = data.copy() for i in range(0, its): smooth_data_old[:] = smooth_data smooth_data = centre_weight*smooth_data_old + \ (1.0 - centre_weight)*self.rbf_smoother(smooth_data) return smooth_data def _construct_rbf_weights(self, delta=None): if delta == None: delta = self.neighbour_cloud_distances[:, 1].mean() self.delta = delta self.gaussian_dist_w = self._rbf_weights(delta) return def _rbf_weights(self, delta=None): neighbour_cloud_distances = self.neighbour_cloud_distances if delta == None: delta = self.neighbour_cloud_distances[:, 1].mean() # Initialise the interpolants gaussian_dist_w = np.zeros_like(neighbour_cloud_distances) gaussian_dist_w[:,:] = np.exp(-np.power(neighbour_cloud_distances[:,:]/delta, 2.0)) gaussian_dist_w[:,:] /= gaussian_dist_w.sum(axis=1).reshape(-1,1) return gaussian_dist_w def rbf_smoother(self, vector, iterations=1, delta=None): """ Smoothing using a radial-basis function smoothing kernel Arguments --------- vector : array of floats, shape (n,) field variable to be smoothed iterations : int number of iterations to smooth vector delta : float / array of floats shape (n,) distance weights to apply the Gaussian interpolants Returns ------- smooth_vec : array of floats, shape (n,) smoothed version of input vector """ if type(delta) != type(None): self._construct_rbf_weights(delta) vector = self.sync(vector) for i in range(0, iterations): # print self.dm.comm.rank, ": RBF ",vector.max(), vector.min() vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1) vector = self.sync(vector_smoothed) return vector def build_rbf_smoother(self, delta, iterations=1): trimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: trimesh_self.lvec.setArray(trimesh_self.neighbour_cloud_distances[:, 1]) trimesh_self.dm.localToGlobal(trimesh_self.lvec, trimesh_self.gvec) delta = trimesh_self.gvec.sum() / trimesh_self.gvec.getSize() inner_self._mesh = trimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): import quagmire smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations)Ancestors
Instance variables
var coords-
Cartesian coordinates of local mesh points
coordsis of shape (npoints, 2)Being a
TriMeshobject,self.coordsare identical toself.dataExpand source code
@property def coords(self): """ Cartesian coordinates of local mesh points `coords` is of shape (npoints, 2) Being a `TriMesh` object, `self.coords` are identical to `self.data` """ return self._coords var data-
Cartesian coordinates of local mesh points.
datais of shape (npoints, 2)Being a
TriMeshobject,self.dataare identical toself.coordsExpand source code
@property def data(self): """ Cartesian coordinates of local mesh points. `data` is of shape (npoints, 2) Being a `TriMesh` object, `self.data` are identical to `self.coords` """ return self._data var id-
Expand source code
@property def id(self): return self.__id
Methods
def build_rbf_smoother(self, delta, iterations=1)-
Expand source code
def build_rbf_smoother(self, delta, iterations=1): trimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: trimesh_self.lvec.setArray(trimesh_self.neighbour_cloud_distances[:, 1]) trimesh_self.dm.localToGlobal(trimesh_self.lvec, trimesh_self.gvec) delta = trimesh_self.gvec.sum() / trimesh_self.gvec.getSize() inner_self._mesh = trimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): import quagmire smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations) def calculate_area_weights(self)-
Calculate pointwise area and weigths
Returns
area:arrayoffloats, shape (n,)- point-wise area
weight:arrayofints, shape (n,)- point-wise weight
Notes
This calls a fortran 90 routine which computes the weight and area for each point in the mesh.
Expand source code
def calculate_area_weights(self): """ Calculate pointwise area and weigths Returns ------- area : array of floats, shape (n,) point-wise area weight : array of ints, shape (n,) point-wise weight Notes ----- This calls a fortran 90 routine which computes the weight and area for each point in the mesh. """ from quagmire._fortran import ntriw area, weight = ntriw(self.tri.x, self.tri.y, self.tri.simplices.T+1) return area, weight def construct_natural_neighbour_cloud(self)-
Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup.
Expand source code
def construct_natural_neighbour_cloud(self): """ Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup. """ # # maximum size of nn array node-by-node (it's almost certainly +1 because # # it is only +2 if the neighbours do not form a closed loop around the node) # max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2 # s = self.tri.simplices # nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int) # nnm = np.zeros((self.npoints, max_neighbours), dtype=bool) # nn = np.empty((self.npoints), dtype=int) # for node in range(0, self.npoints): # w = np.where(s==node) # l = np.unique(s[w[0]]) # # l = np.roll(l,-np.argwhere(l == node)[0]) # nn[node] = l.shape[0] # nns[node,0:nn[node]] = l # nnm[node,0:nn[node]] = True # self.natural_neighbours = nns # self.natural_neighbours_count = nn # self.natural_neighbours_mask = nnm # Find all the segments in the triangulation and sort the array # (note this differs from the stripy routine because is treates m-n and n-m as distinct) lst = self.tri.lst lend = self.tri.lend lptr = self.tri.lptr segments_array = np.empty((len(lptr),2),dtype=np.int) segments_array[:,0] = np.abs(lst[:]) - 1 segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1 valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0) segments = segments_array[valid,:] deshuffled = self.tri._deshuffle_simplices(segments) smsi = np.argsort(deshuffled[:,0]) sms = np.zeros_like(deshuffled) sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:] natural_neighbours_count = np.bincount(sms[:,0]) iend = np.cumsum(natural_neighbours_count) istart = np.zeros_like(iend) istart[1:] = iend[:-1] natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int) for j in range(0,self.npoints): natural_neighbours[j, 0] = j natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1] natural_neighbours_count += 1 self.natural_neighbours = natural_neighbours self.natural_neighbours_count = natural_neighbours_count self.natural_neighbours_mask = natural_neighbours != -1 return def construct_neighbour_cloud(self, size=25)-
Find neighbours from distance cKDTree.
Parameters
size:int- Number of neighbours to search for
Notes
Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large
sizeis set to.Expand source code
def construct_neighbour_cloud(self, size=25): """ Find neighbours from distance cKDTree. Parameters ---------- size : int Number of neighbours to search for Notes ----- Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large `size` is set to. """ nndist, nncloud = self.cKDTree.query(self.tri.points, k=size) self.neighbour_cloud = nncloud self.neighbour_cloud_distances = nndist neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints) self.near_neighbours = neighbours + 2 self.extended_neighbours = np.full_like(neighbours, size) # self.near_neighbour_mask = np.zeros_like(nncloud, dtype=np.bool) # for node in range(0,self.npoints): # self.near_neighbour_mask[node, 0:self.near_neighbours[node]] = True # row, col = np.nonzero(self.tri.simplices == node) # natural_neighbours = list(np.unique(np.hstack(self.tri.simplices[row]))) # # move self node to start of the list # natural_neighbours.remove(node) # natural_neighbours.insert(0, node) # # identify distance neighbours from cloud and join lists # distance_neighbours = list(set(nncloud[node]) - set(natural_neighbours)) # node_neighbours = np.hstack([natural_neighbours, distance_neighbours]) # # size of node_nncloud will be >= ncols # nncloud[node] = node_neighbours[0:size] # ## Nothing is done about the nndist array, yet... return def derivative_div(self, PHIx, PHIy, **kwargs)-
Compute second order derivative from flux fields PHIx, PHIy We evaluate the gradient on these fields using the
derivative_gradmethod.Arguments
PHIx:ndarrayoffloats, shape (n,)- array of first partial derivatives in x direction
PHIy:ndarrayoffloats, shape (n,)- array of first partial derivatives in y direction
kwargs:optional keyword-argument specifiers- keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3
Returns
del2PHI:ndarrayoffloats, shape (n,)- second derivative of PHI
Expand source code
def derivative_div(self, PHIx, PHIy, **kwargs): """ Compute second order derivative from flux fields PHIx, PHIy We evaluate the gradient on these fields using the `derivative_grad` method. Arguments --------- PHIx : ndarray of floats, shape (n,) array of first partial derivatives in x direction PHIy : ndarray of floats, shape (n,) array of first partial derivatives in y direction kwargs : optional keyword-argument specifiers keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3 Returns ------- del2PHI : ndarray of floats, shape (n,) second derivative of PHI """ u_xx = self.derivative_grad(PHIx, **kwargs)[0] u_yy = self.derivative_grad(PHIy, **kwargs)[1] return u_xx + u_yy def derivative_grad(self, PHI, nit=10, tol=1e-08)-
Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
PHI:ndarrayoffloats, shape (n,)- compute the derivative of this array
nit:int optional (default: 10)- number of iterations to reach convergence
tol:float optional (default: 1e-8)- convergence is reached when this tolerance is met
Returns
PHIx:ndarrayoffloats, shape(n,)- first partial derivative of PHI in x direction
PHIy:ndarrayoffloats, shape(n,)- first partial derivative of PHI in y direction
Expand source code
def derivative_grad(self, PHI, nit=10, tol=1e-8): """ Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ u_x, u_y = self.tri.gradient(PHI, nit, tol) grads = np.ndarray((u_x.size, 2)) grads[:, 0] = u_x grads[:, 1] = u_y return grads def derivatives(self, PHI, nit=10, tol=1e-08)-
Expand source code
def derivatives(self, PHI, nit=10, tol=1e-8): return self.derivative_grad(PHI, nit=10, tol=1e-8) def local_area_smoothing(self, data, its=1, centre_weight=0.75)-
Local area smoothing using radial-basis function smoothing kernel
Parameters
data:arrayoffloats, shape (n,)- field variable to be smoothed
its:int- number of iterations (default: 1)
centre_weight:float- weight to apply to centre nodes (default: 0.75)
other nodes are weighted by (1 -
centre_weight)
Returns
sm:arrayoffloats, shape (n,)- smoothed field variable
Expand source code
def local_area_smoothing(self, data, its=1, centre_weight=0.75): """ Local area smoothing using radial-basis function smoothing kernel Parameters ---------- data : array of floats, shape (n,) field variable to be smoothed its : int number of iterations (default: 1) centre_weight : float weight to apply to centre nodes (default: 0.75) other nodes are weighted by (1 - `centre_weight`) Returns ------- sm : array of floats, shape (n,) smoothed field variable """ smooth_data = data.copy() smooth_data_old = data.copy() for i in range(0, its): smooth_data_old[:] = smooth_data smooth_data = centre_weight*smooth_data_old + \ (1.0 - centre_weight)*self.rbf_smoother(smooth_data) return smooth_data def node_neighbours(self, point)-
Returns a list of neighbour nodes for a given point in the Delaunay triangulation.
Parameters
point:int- point on the mesh
Returns
neighbours:list- list of neighbours that are connected by line segments
to the specified
point
Expand source code
def node_neighbours(self, point): """ Returns a list of neighbour nodes for a given point in the Delaunay triangulation. Parameters ---------- point : int point on the mesh Returns ------- neighbours : list list of neighbours that are connected by line segments to the specified `point` """ return self.natural_neighbours[point,1:self.natural_neighbours_count[point]] def rbf_smoother(self, vector, iterations=1, delta=None)-
Smoothing using a radial-basis function smoothing kernel
Arguments
vector:arrayoffloats, shape (n,)- field variable to be smoothed
iterations:int- number of iterations to smooth vector
delta:float / arrayoffloats shape (n,)- distance weights to apply the Gaussian interpolants
Returns
smooth_vec:arrayoffloats, shape (n,)- smoothed version of input vector
Expand source code
def rbf_smoother(self, vector, iterations=1, delta=None): """ Smoothing using a radial-basis function smoothing kernel Arguments --------- vector : array of floats, shape (n,) field variable to be smoothed iterations : int number of iterations to smooth vector delta : float / array of floats shape (n,) distance weights to apply the Gaussian interpolants Returns ------- smooth_vec : array of floats, shape (n,) smoothed version of input vector """ if type(delta) != type(None): self._construct_rbf_weights(delta) vector = self.sync(vector) for i in range(0, iterations): # print self.dm.comm.rank, ": RBF ",vector.max(), vector.min() vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1) vector = self.sync(vector_smoothed) return vector
Inherited members