Module quagmire.mesh.strimesh
For unstructured data on the sphere.

sTriMesh implements the following functionality:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Supply a PETSc DM object (created from quagmire.tools.meshtools) to initialise the object.
Expand source code
# Copyright 2016-2020 Louis Moresi, Ben Mather, Romain Beucher
#
# This file is part of Quagmire.
#
# Quagmire is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or any later version.
#
# Quagmire is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Quagmire. If not, see <http://www.gnu.org/licenses/>.
"""
For unstructured data on the sphere.
<img src="https://raw.githubusercontent.com/underworldcode/quagmire/dev/docs/images/quagmire-flowchart-strimesh.png" style="width: 321px; float:right">
`sTriMesh` implements the following functionality:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Supply a `PETSc DM` object (created from `quagmire.tools.meshtools`) to initialise the object.
"""
import numpy as np
from mpi4py import MPI
import sys,petsc4py
petsc4py.init(sys.argv)
from petsc4py import PETSc
# comm = MPI.COMM_WORLD
from time import perf_counter
from .commonmesh import CommonMesh as _CommonMesh
from scipy.spatial import cKDTree as _cKDTree
try: range = xrange
except: pass
from quagmire import function as fn
from quagmire.function import LazyEvaluation as _LazyEvaluation
class sTriMesh(_CommonMesh):
"""
Build spatial data structures on an __unstructured spherical mesh__.
Use `sTriMesh` for:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Each of these data structures are built on top of a `PETSc DM` object
(created from `quagmire.tools.meshtools`).
Parameters
----------
DM : PETSc DMPlex object
Build this unstructured spherical mesh object using one of:
- `quagmire.tools.meshtools.create_spherical_DMPlex`
- `quagmire.tools.meshtools.create_DMPlex_from_spherical_points`
verbose : bool
Flag toggles verbose output
*args : optional arguments
**kwargs : optional keyword arguments
Attributes
----------
tri : stripy Triangulation object
Cartesian mesh object generated by `stripy`
npoints : int
Number of points (n) in the mesh
pointwise_area : Quagmire MeshVariable
`quagmire.mesh.basemesh.MeshVariable` of point-wise area
mask : Quagmire MeshVariable
`quagmire.mesh.basemesh.MeshVariable` to denote points on the boundary
data : array of floats, shape (n,2)
Cartesian mesh coordinates in x,y directions
coords : array of floats, shape (n,2)
Same as `data`
neighbour_cloud : array of ints, shape(n,25)
array of nearest node neighbours by distance
near_neighbour_mask : array of bools, shape(n,25)
mask immediate node neighbours in `neighbour_cloud`
timings : dict
Timing information for each Quagmire routine
"""
__count = 0
@classmethod
def _count(cls):
sTriMesh.__count += 1
return sTriMesh.__count
@property
def id(self):
return self.__id
def __init__(self, dm, r1=6384.4e3, r2=6352.8e3, verbose=True, *args, **kwargs):
import stripy
# initialise base mesh class
super(sTriMesh, self).__init__(dm, verbose)
self.__id = "strimesh_{}".format(self._count())
# Delaunay triangulation
t = perf_counter()
coords = dm.getCoordinatesLocal().array.reshape(-1,3)
minX, minY, minZ = coords.min(axis=0)
maxX, maxY, maxZ = coords.max(axis=0)
length_scale = np.sqrt((maxX - minX)*(maxY - minY)/coords.shape[0])
# coords += np.random.random(coords.shape) * 0.0001 * length_scale # This should be aware of the point spacing (small perturbation)
# r = np.sqrt(coords[:,0]**2 + coords[:,1]**2 + coords[:,2]**2) # should just equal 1
# r = 1.0
# lons = np.arctan2(coords[:,1], coords[:,0])
# lats = np.arcsin(coords[:,2]/r)
rlons, rlats = stripy.spherical.xyz2lonlat(coords[:,0], coords[:,1], coords[:,2])
lons = np.degrees(rlons)
lats = np.degrees(rlats)
self.tri = stripy.sTriangulation(rlons, rlats, permute=True)
self.npoints = self.tri.npoints
self.timings['triangulation'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Delaunay triangulation {}s".format(self.dm.comm.rank, perf_counter()-t))
# Calculate geocentric radius
self._radius = geocentric_radius(lats, r1, r2)
self._coords = np.c_[lons, lats]
self._data = self.tri.points*self._radius.reshape(-1,1)
# Calculate weigths and pointwise area
t = perf_counter()
self.area, self.weight = self.calculate_area_weights()
self.pointwise_area = self.add_variable(name="area")
self.pointwise_area.data = self.area
self.pointwise_area.lock()
self.timings['area weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Calculate node weights and area {}s".format(self.dm.comm.rank, perf_counter()-t))
# Find boundary points
t = perf_counter()
self.bmask = self.get_boundary()
self.mask = self.add_variable(name="Mask")
self.mask.data = self.bmask.astype(PETSc.ScalarType)
self.mask.lock()
self.timings['find boundaries'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Find boundaries {}s".format(self.dm.comm.rank, perf_counter()-t))
# cKDTree
t = perf_counter()
self.cKDTree = _cKDTree(self.data, balanced_tree=False)
self.timings['cKDTree'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - cKDTree {}s".format(self.dm.comm.rank, perf_counter()-t))
# Find neighbours
t = perf_counter()
self.construct_neighbour_cloud()
self.timings['construct neighbour cloud'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
t1 = perf_counter()
self.construct_natural_neighbour_cloud()
self.timings['construct natural neighbour cloud'] = [perf_counter()-t1, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print("{} - Construct neighbour cloud arrays {}s, ({}s + {}s)".format(self.dm.comm.rank, perf_counter()-t,
self.timings['construct neighbour cloud'][0],
self.timings['construct natural neighbour cloud'][0] ))
# RBF smoothing operator
t = perf_counter()
self._construct_rbf_weights()
self.timings['construct rbf weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Construct rbf weights {}s".format(self.dm.comm.rank, perf_counter()-t))
self.root = False
# We place these properties here to prevent users from mucking up the coordinates.
# Radius is the only property to have a setter only because freedom in the vertical axis
# doesn't mess up the graph, and is quite useful for visualisation in lavavu / paraview.
## FUTURE: deforming the mesh should be via a context manager.
@property
def data(self):
"""
Cartesian coordinates of local mesh points.
`data` is of shape (npoints, 3)
These points are projected to the radius of the sphere.
If `self.radius` is set to 1.0 then these points are on the unit sphere and are
identical to `self.tri.points`.
"""
return self._data
@property
def coords(self):
"""
Spherical coordinates of local mesh points in degrees lon / lat.
`coords` is of shape (npoints, 2)
"""
return self._coords
@property
def radius(self):
"""
The radius of the sphere.
Use `geocentric_radius` to compute the radius with distortion between the poles
and the equator, otherwise Quagmire uses Earth values by default. i.e.
```
radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3)
```
Setting a new value of radius updates the point-wise area calculation.
"""
return self._radius
@radius.setter
def radius(self, value):
self._radius = value
# update mesh coordinates and rebuild k-d tree
self._data = self.tri.points*np.array(self._radius).reshape(-1,1)
self._update_DM_coordinates()
self.cKDTree = _cKDTree(self.data, balanced_tree=False)
self.construct_neighbour_cloud()
# re-evalutate mesh area
self.area, self.weight = self.calculate_area_weights()
self.pointwise_area.unlock()
self.pointwise_area.data = self.area
self.pointwise_area.lock()
def _update_DM_coordinates(self):
""" Update the coordinates on the DM """
# get global coordinate vector from DM
global_coord_vec = self.dm.getCoordinates()
global_coord_vec.setArray(self.data.ravel())
self.dm.setCoordinates(global_coord_vec)
def interpolate(self, lons, lats, zdata, order=1, grad=None):
"""
Base class to handle nearest neighbour, linear, and cubic interpolation.
Given a triangulation of a set of nodes on the unit sphere, along with data
values at the nodes, this method interpolates (or extrapolates) the value
at a given longitude and latitude.
Parameters
----------
lons : float / array of floats, shape (l,)
longitudinal coordinate(s) on the sphere
lats : float / array of floats, shape (l,)
latitudinal coordinate(s) on the sphere
zdata : array of floats, shape (n,)
value at each point in the triangulation
must be the same size of the mesh
order : int (default=1)
order of the interpolatory function used
- 0 = nearest-neighbour
- 1 = linear
- 3 = cubic
Returns
-------
zi : float / array of floats, shape (l,)
interpolated value(s) at (lons, lats)
err : int / array of ints, shape (l,)
whether interpolation (0), extrapolation (1) or error (other)
"""
rlons = np.radians(lons)
rlats = np.radians(lats)
return self.tri.interpolate(rlons, rlats, zdata, order, grad)
def calculate_area_weights(self):
"""
Calculate pointwise weights and area
Returns
-------
area : array of floats, shape (n,)
point-wise area
weights : array of ints, shape(n,)
weighting for each point
Notes
-----
This calls a fortran 90 routine which computes the weight and area
for each point in the mesh using the geocentric radius of the sphere
at the equator `r1` and the poles `r2` (defaults to Earth values:
6384.4km and 6352.8km, respectively).
"""
from quagmire._fortran import ntriw_s
R = self._radius
tri_area = self.tri.areas()
# find surface area and weights on the unit sphere
area, weight = ntriw_s(self.npoints, self.tri.simplices.T+1, tri_area)
# project to the radius of the sphere
area *= R**2
return area, weight
def node_neighbours(self, point):
"""
Returns a list of neighbour nodes for a given point in the
Delaunay triangulation.
Parameters
----------
point : int
point on the mesh
Returns
-------
neighbours : list
list of neighbours that are connected by line segments
to the specified `point`
"""
return self.natural_neighbours[point,1:self.natural_neighbours_count[point]]
def derivatives(self, PHI, nit=10, tol=1e-8):
"""
Compute derivatives of PHI in the x, y directions.
This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
---------
PHI : ndarray of floats, shape (n,)
compute the derivative of this array
nit : int optional (default: 10)
number of iterations to reach convergence
tol : float optional (default: 1e-8)
convergence is reached when this tolerance is met
Returns
-------
PHIx : ndarray of floats, shape(n,)
first partial derivative of PHI in x direction
PHIy : ndarray of floats, shape(n,)
first partial derivative of PHI in y direction
"""
PHIx, PHIy = self.tri.derivatives_lonlat(PHI, nit, tol)
grads = np.ndarray((PHIx.size, 2))
grads[:, 0] = np.radians(PHIx)
grads[:, 1] = np.radians(PHIy)
return grads
def derivative_grad(self, PHI, nit=10, tol=1e-8):
"""
Compute gradients of PHI in the x, y directions.
This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
---------
PHI : ndarray of floats, shape (n,)
compute the derivative of this array
nit : int optional (default: 10)
number of iterations to reach convergence
tol : float optional (default: 1e-8)
convergence is reached when this tolerance is met
Returns
-------
PHIx : ndarray of floats, shape(n,)
first partial derivative of PHI in x direction
PHIy : ndarray of floats, shape(n,)
first partial derivative of PHI in y direction
"""
PHIx, PHIy = self.tri.gradient_lonlat(PHI, nit, tol)
grads = np.ndarray((PHIx.size, 2))
grads[:, 0] = np.radians(PHIx)
grads[:, 1] = np.radians(PHIy)
return grads
# required for compatibility among FlatMesh types
def _derivative_grad_cartesian(self, *args, **kwargs):
u_x, u_y, u_z = self.tri.gradient_xyz(*args, **kwargs)
grads = np.ndarray((u_x.size, 3))
grads[:, 0] = u_x
grads[:, 1] = u_y
grads[:, 2] = u_z
return grads
def derivative_div(self, PHIx, PHIy, PHIz, **kwargs):
"""
Compute second order derivative from flux fields PHIx, PHIy, PHIz
We evaluate the gradient on these fields using the derivative-grad method.
Arguments
---------
PHIx : ndarray of floats, shape (n,)
array of first partial derivatives in x direction
PHIy : ndarray of floats, shape (n,)
array of first partial derivatives in y direction
PHIz : ndarray of floats, shape (n,)
array of first partial derivatives in z direction
kwargs : optional keyword-argument specifiers
keyword arguments to be passed onto derivative_grad
e.g. nit=5, tol=1e-3
Returns
-------
del2PHI : ndarray of floats, shape (n,)
second derivative of PHI
"""
u_xx, u_xy, u_zz = self.tri.gradient_xyz(PHIx, **kwargs)
u_yx, u_yy, u_zz = self.tri.gradient_xyz(PHIy, **kwargs)
u_zx, u_zy, u_zz = self.tri.gradient_xyz(PHIz, **kwargs)
return (u_xx + u_yy + u_zz)/self._radius
def construct_natural_neighbour_cloud(self):
"""
Find the natural neighbours for each node in the triangulation and store in a
numpy array for efficient lookup.
"""
# # maximum size of nn array node-by-node (it's almost certainly +1 because
# # it is only +2 if the neighbours do not form a closed loop around the node)
# max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2
# s = self.tri.simplices
# nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int)
# nnm = np.zeros((self.npoints, max_neighbours), dtype=bool)
# nn = np.empty((self.npoints), dtype=int)
# for node in range(0, self.npoints):
# w = np.where(s==node)
# l = np.unique(s[w[0]])
# # l = np.roll(l,-np.argwhere(l == node)[0])
# nn[node] = l.shape[0]
# nns[node,0:nn[node]] = l
# nnm[node,0:nn[node]] = True
# self.natural_neighbours = nns
# self.natural_neighbours_count = nn
# self.natural_neighbours_mask = nnm
# Find all the segments in the triangulation and sort the array
# (note this differs from the stripy routine because is treates m-n and n-m as distinct)
lst = self.tri.lst
lend = self.tri.lend
lptr = self.tri.lptr
segments_array = np.empty((len(lptr),2),dtype=np.int)
segments_array[:,0] = np.abs(lst[:]) - 1
segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1
valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0)
segments = segments_array[valid,:]
deshuffled = self.tri._deshuffle_simplices(segments)
smsi = np.argsort(deshuffled[:,0])
sms = np.zeros_like(deshuffled)
sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:]
natural_neighbours_count = np.bincount(sms[:,0])
iend = np.cumsum(natural_neighbours_count)
istart = np.zeros_like(iend)
istart[1:] = iend[:-1]
natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int)
for j in range(0,self.npoints):
natural_neighbours[j, 0] = j
natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1]
natural_neighbours_count += 1
self.natural_neighbours = natural_neighbours
self.natural_neighbours_count = natural_neighbours_count
self.natural_neighbours_mask = natural_neighbours != -1
return
def construct_neighbour_cloud(self, size=25):
"""
Find neighbours from distance cKDTree.
Parameters
----------
size : int
Number of neighbours to search for
Notes
-----
Use this method to search for neighbours that are not
necessarily immediate node neighbours (i.e. neighbours
connected by a line segment). Extended node neighbours
should be captured by the search depending on how large
`size` is set to.
"""
nndist, nncloud = self.cKDTree.query(self.data, k=size)
self.neighbour_cloud = nncloud
# find the arc distances
for n in range(1, size):
nndist[:,n] = distance_on_sphere(self.data, self.data[nncloud[:,n]], self.radius)
self.neighbour_cloud_distances = nndist
neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints)
self.near_neighbours = neighbours + 2
self.extended_neighbours = np.full_like(neighbours, size)
def local_area_smoothing(self, data, its=1, centre_weight=0.75):
"""
Local area smoothing using radial-basis function smoothing kernel
Parameters
----------
data : array of floats, shape (n,)
field variable to be smoothed
its : int
number of iterations (default: 1)
centre_weight : float
weight to apply to centre nodes (default: 0.75)
other nodes are weighted by (1 - `centre_weight`)
Returns
-------
sm : array of floats, shape (n,)
smoothed field variable
"""
smooth_data = data.copy()
smooth_data_old = data.copy()
for i in range(0, its):
smooth_data_old[:] = smooth_data
smooth_data = centre_weight*smooth_data_old + \
(1.0 - centre_weight)*self.rbf_smoother(smooth_data)
return smooth_data
def _construct_rbf_weights(self, delta=None):
if delta == None:
delta = self.neighbour_cloud_distances[:, 1].mean()
self.delta = delta
self.gaussian_dist_w = self._rbf_weights(delta)
return
def _rbf_weights(self, delta=None):
neighbour_cloud_distances = self.neighbour_cloud_distances
if delta == None:
delta = self.neighbour_cloud_distances[:, 1].mean()
# Initialise the interpolants
gaussian_dist_w = np.zeros_like(neighbour_cloud_distances)
gaussian_dist_w[:,:] = np.exp(-np.power(neighbour_cloud_distances[:,:]/delta, 2.0))
gaussian_dist_w[:,:] /= gaussian_dist_w.sum(axis=1).reshape(-1,1)
return gaussian_dist_w
def rbf_smoother(self, vector, iterations=1, delta=None):
"""
Smoothing using a radial-basis function smoothing kernel
Arguments
---------
vector : array of floats, shape (n,)
field variable to be smoothed
iterations : int
number of iterations to smooth vector
delta : float / array of floats shape (n,)
distance weights to apply the Gaussian interpolants
Returns
-------
smooth_vec : array of floats, shape (n,)
smoothed version of input vector
"""
if type(delta) != type(None):
self._construct_rbf_weights(delta)
vector = self.sync(vector)
for i in range(0, iterations):
# print self.dm.comm.rank, ": RBF ",vector.max(), vector.min()
vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1)
vector = self.sync(vector_smoothed)
return vector
def build_rbf_smoother(self, delta, iterations=1):
strimesh_self = self
class _rbf_smoother_object(object):
def __init__(inner_self, delta, iterations=1):
if delta == None:
strimesh_self.lvec.setArray(strimesh_self.neighbour_cloud_distances[:, 1])
strimesh_self.dm.localToGlobal(strimesh_self.lvec, strimesh_self.gvec)
delta = strimesh_self.gvec.sum() / strimesh_self.gvec.getSize()
inner_self._mesh = strimesh_self
inner_self.delta = delta
inner_self.iterations = iterations
inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta)
return
def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1):
import quagmire
smooth_node_values = lazyFn.evaluate(inner_self._mesh)
for i in range(0, iterations):
smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1)
smooth_node_values = inner_self._mesh.sync(smooth_node_values)
return smooth_node_values
def smooth_fn(inner_self, meshVar, iterations=None):
if iterations is None:
iterations = inner_self.iterations
def smoother_fn(*args, **kwargs):
import quagmire
smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations)
if len(args) == 1 and args[0] == meshVar._mesh:
return smooth_node_values
elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]):
mesh = args[0]
return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs)
else:
xi = np.atleast_1d(args[0])
yi = np.atleast_1d(args[1])
i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs)
return i
newLazyFn = _LazyEvaluation()
newLazyFn.evaluate = smoother_fn
newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations)
return newLazyFn
return _rbf_smoother_object(delta, iterations)
def build_rbf_smoother(self, delta, iterations=1):
strimesh_self = self
class _rbf_smoother_object(object):
def __init__(inner_self, delta, iterations=1):
if delta == None:
delta = strimesh_self.neighbour_cloud_distances[:, 1].mean() ## NOT SAFE IN PARALLEL !!!
inner_self._mesh = strimesh_self
inner_self.delta = delta
inner_self.iterations = iterations
inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta)
return
def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1):
import quagmire
smooth_node_values = lazyFn.evaluate(inner_self._mesh)
for i in range(0, iterations):
smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1)
smooth_node_values = inner_self._mesh.sync(smooth_node_values)
return smooth_node_values
def smooth_fn(inner_self, meshVar, iterations=None):
import quagmire
if iterations is None:
iterations = inner_self.iterations
def smoother_fn(*args, **kwargs):
smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations)
if len(args) == 1 and args[0] == meshVar._mesh:
return smooth_node_values
elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]):
mesh = args[0]
return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs)
else:
xi = np.atleast_1d(args[0])
yi = np.atleast_1d(args[1])
i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs)
return i
newLazyFn = _LazyEvaluation()
newLazyFn.evaluate = smoother_fn
newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations)
return newLazyFn
return _rbf_smoother_object(delta, iterations)
def geocentric_radius(lat, r1=6384.4e3, r2=6352.8e3):
"""
Calculate the radius of an oblate spheroid (like the earth)
Parameters
----------
lat : array of floats
latitudinal coordinates in degrees
r1 : float
radius at the equator (in metres)
r2 : float
radius at the poles (in metres)
Returns
-------
r : array of floats
radius at provided latitudes `lat` in metres
"""
rlat = np.radians(lat)
coslat = np.cos(rlat)
sinlat = np.sin(rlat)
num = (r1**2*coslat)**2 + (r2**2*sinlat)**2
den = (r1*coslat)**2 + (r2*sinlat)**2
return np.sqrt(num/den)
def distance_on_sphere(A, B, radius=None):
"""
Find the distance on the sphere between two sets of points (A and B)
If the radius of the sphere is not provided, then the radius is
estimated from A.
Parameters
----------
A : ndarray
array of points with axis 0 or 1 being the same as B
B : ndarray
array of points with axis 0 or 1 being the same as A
radius : float or ndarray
radius of the sphere
Returns
-------
distance : ndarray
array of spherical distances the same shape as A or B
depending on which is larger
"""
A = np.atleast_2d(A)
B = np.atleast_2d(B)
norm_cross = np.linalg.norm(np.cross(A, B), axis=1)
dot = (A*B).sum(axis=1)
if radius is None:
radius = np.linalg.norm(A, axis=1)
return np.arctan(norm_cross/dot) * radius
Functions
def distance_on_sphere(A, B, radius=None)-
Find the distance on the sphere between two sets of points (A and B) If the radius of the sphere is not provided, then the radius is estimated from A.
Parameters
A:ndarray- array of points with axis 0 or 1 being the same as B
B:ndarray- array of points with axis 0 or 1 being the same as A
radius:floatorndarray- radius of the sphere
Returns
distance:ndarray- array of spherical distances the same shape as A or B depending on which is larger
Expand source code
def distance_on_sphere(A, B, radius=None): """ Find the distance on the sphere between two sets of points (A and B) If the radius of the sphere is not provided, then the radius is estimated from A. Parameters ---------- A : ndarray array of points with axis 0 or 1 being the same as B B : ndarray array of points with axis 0 or 1 being the same as A radius : float or ndarray radius of the sphere Returns ------- distance : ndarray array of spherical distances the same shape as A or B depending on which is larger """ A = np.atleast_2d(A) B = np.atleast_2d(B) norm_cross = np.linalg.norm(np.cross(A, B), axis=1) dot = (A*B).sum(axis=1) if radius is None: radius = np.linalg.norm(A, axis=1) return np.arctan(norm_cross/dot) * radius def geocentric_radius(lat, r1=6384400.0, r2=6352800.0)-
Calculate the radius of an oblate spheroid (like the earth)
Parameters
lat:arrayoffloats- latitudinal coordinates in degrees
r1:float- radius at the equator (in metres)
r2:float- radius at the poles (in metres)
Returns
r:arrayoffloats- radius at provided latitudes
latin metres
Expand source code
def geocentric_radius(lat, r1=6384.4e3, r2=6352.8e3): """ Calculate the radius of an oblate spheroid (like the earth) Parameters ---------- lat : array of floats latitudinal coordinates in degrees r1 : float radius at the equator (in metres) r2 : float radius at the poles (in metres) Returns ------- r : array of floats radius at provided latitudes `lat` in metres """ rlat = np.radians(lat) coslat = np.cos(rlat) sinlat = np.sin(rlat) num = (r1**2*coslat)**2 + (r2**2*sinlat)**2 den = (r1*coslat)**2 + (r2*sinlat)**2 return np.sqrt(num/den)
Classes
class sTriMesh (dm, r1=6384400.0, r2=6352800.0, verbose=True, *args, **kwargs)-
Build spatial data structures on an unstructured spherical mesh.
Use
sTriMeshfor:- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Each of these data structures are built on top of a
PETSc DMobject (created fromquagmire.tools.meshtools).Parameters
DM:PETSc DMPlex object-
Build this unstructured spherical mesh object using one of:
verbose:bool- Flag toggles verbose output
*args:optional arguments**kwargs:optional keyword arguments
Attributes
tri:stripy Triangulation object- Cartesian mesh object generated by
stripy npoints:int- Number of points (n) in the mesh
pointwise_area:Quagmire MeshVariableMeshVariableof point-wise areamask:Quagmire MeshVariableMeshVariableto denote points on the boundarydata:arrayoffloats, shape (n,2)- Cartesian mesh coordinates in x,y directions
coords:arrayoffloats, shape (n,2)- Same as
data neighbour_cloud:arrayofints, shape(n,25)- array of nearest node neighbours by distance
near_neighbour_mask:arrayofbools, shape(n,25)- mask immediate node neighbours in
neighbour_cloud timings:dict- Timing information for each Quagmire routine
Expand source code
class sTriMesh(_CommonMesh): """ Build spatial data structures on an __unstructured spherical mesh__. Use `sTriMesh` for: - calculating spatial derivatives - identifying node neighbour relationships - interpolation / extrapolation - smoothing operators - importing and saving mesh information Each of these data structures are built on top of a `PETSc DM` object (created from `quagmire.tools.meshtools`). Parameters ---------- DM : PETSc DMPlex object Build this unstructured spherical mesh object using one of: - `quagmire.tools.meshtools.create_spherical_DMPlex` - `quagmire.tools.meshtools.create_DMPlex_from_spherical_points` verbose : bool Flag toggles verbose output *args : optional arguments **kwargs : optional keyword arguments Attributes ---------- tri : stripy Triangulation object Cartesian mesh object generated by `stripy` npoints : int Number of points (n) in the mesh pointwise_area : Quagmire MeshVariable `quagmire.mesh.basemesh.MeshVariable` of point-wise area mask : Quagmire MeshVariable `quagmire.mesh.basemesh.MeshVariable` to denote points on the boundary data : array of floats, shape (n,2) Cartesian mesh coordinates in x,y directions coords : array of floats, shape (n,2) Same as `data` neighbour_cloud : array of ints, shape(n,25) array of nearest node neighbours by distance near_neighbour_mask : array of bools, shape(n,25) mask immediate node neighbours in `neighbour_cloud` timings : dict Timing information for each Quagmire routine """ __count = 0 @classmethod def _count(cls): sTriMesh.__count += 1 return sTriMesh.__count @property def id(self): return self.__id def __init__(self, dm, r1=6384.4e3, r2=6352.8e3, verbose=True, *args, **kwargs): import stripy # initialise base mesh class super(sTriMesh, self).__init__(dm, verbose) self.__id = "strimesh_{}".format(self._count()) # Delaunay triangulation t = perf_counter() coords = dm.getCoordinatesLocal().array.reshape(-1,3) minX, minY, minZ = coords.min(axis=0) maxX, maxY, maxZ = coords.max(axis=0) length_scale = np.sqrt((maxX - minX)*(maxY - minY)/coords.shape[0]) # coords += np.random.random(coords.shape) * 0.0001 * length_scale # This should be aware of the point spacing (small perturbation) # r = np.sqrt(coords[:,0]**2 + coords[:,1]**2 + coords[:,2]**2) # should just equal 1 # r = 1.0 # lons = np.arctan2(coords[:,1], coords[:,0]) # lats = np.arcsin(coords[:,2]/r) rlons, rlats = stripy.spherical.xyz2lonlat(coords[:,0], coords[:,1], coords[:,2]) lons = np.degrees(rlons) lats = np.degrees(rlats) self.tri = stripy.sTriangulation(rlons, rlats, permute=True) self.npoints = self.tri.npoints self.timings['triangulation'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Delaunay triangulation {}s".format(self.dm.comm.rank, perf_counter()-t)) # Calculate geocentric radius self._radius = geocentric_radius(lats, r1, r2) self._coords = np.c_[lons, lats] self._data = self.tri.points*self._radius.reshape(-1,1) # Calculate weigths and pointwise area t = perf_counter() self.area, self.weight = self.calculate_area_weights() self.pointwise_area = self.add_variable(name="area") self.pointwise_area.data = self.area self.pointwise_area.lock() self.timings['area weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Calculate node weights and area {}s".format(self.dm.comm.rank, perf_counter()-t)) # Find boundary points t = perf_counter() self.bmask = self.get_boundary() self.mask = self.add_variable(name="Mask") self.mask.data = self.bmask.astype(PETSc.ScalarType) self.mask.lock() self.timings['find boundaries'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Find boundaries {}s".format(self.dm.comm.rank, perf_counter()-t)) # cKDTree t = perf_counter() self.cKDTree = _cKDTree(self.data, balanced_tree=False) self.timings['cKDTree'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - cKDTree {}s".format(self.dm.comm.rank, perf_counter()-t)) # Find neighbours t = perf_counter() self.construct_neighbour_cloud() self.timings['construct neighbour cloud'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] t1 = perf_counter() self.construct_natural_neighbour_cloud() self.timings['construct natural neighbour cloud'] = [perf_counter()-t1, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print("{} - Construct neighbour cloud arrays {}s, ({}s + {}s)".format(self.dm.comm.rank, perf_counter()-t, self.timings['construct neighbour cloud'][0], self.timings['construct natural neighbour cloud'][0] )) # RBF smoothing operator t = perf_counter() self._construct_rbf_weights() self.timings['construct rbf weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Construct rbf weights {}s".format(self.dm.comm.rank, perf_counter()-t)) self.root = False # We place these properties here to prevent users from mucking up the coordinates. # Radius is the only property to have a setter only because freedom in the vertical axis # doesn't mess up the graph, and is quite useful for visualisation in lavavu / paraview. ## FUTURE: deforming the mesh should be via a context manager. @property def data(self): """ Cartesian coordinates of local mesh points. `data` is of shape (npoints, 3) These points are projected to the radius of the sphere. If `self.radius` is set to 1.0 then these points are on the unit sphere and are identical to `self.tri.points`. """ return self._data @property def coords(self): """ Spherical coordinates of local mesh points in degrees lon / lat. `coords` is of shape (npoints, 2) """ return self._coords @property def radius(self): """ The radius of the sphere. Use `geocentric_radius` to compute the radius with distortion between the poles and the equator, otherwise Quagmire uses Earth values by default. i.e. ``` radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3) ``` Setting a new value of radius updates the point-wise area calculation. """ return self._radius @radius.setter def radius(self, value): self._radius = value # update mesh coordinates and rebuild k-d tree self._data = self.tri.points*np.array(self._radius).reshape(-1,1) self._update_DM_coordinates() self.cKDTree = _cKDTree(self.data, balanced_tree=False) self.construct_neighbour_cloud() # re-evalutate mesh area self.area, self.weight = self.calculate_area_weights() self.pointwise_area.unlock() self.pointwise_area.data = self.area self.pointwise_area.lock() def _update_DM_coordinates(self): """ Update the coordinates on the DM """ # get global coordinate vector from DM global_coord_vec = self.dm.getCoordinates() global_coord_vec.setArray(self.data.ravel()) self.dm.setCoordinates(global_coord_vec) def interpolate(self, lons, lats, zdata, order=1, grad=None): """ Base class to handle nearest neighbour, linear, and cubic interpolation. Given a triangulation of a set of nodes on the unit sphere, along with data values at the nodes, this method interpolates (or extrapolates) the value at a given longitude and latitude. Parameters ---------- lons : float / array of floats, shape (l,) longitudinal coordinate(s) on the sphere lats : float / array of floats, shape (l,) latitudinal coordinate(s) on the sphere zdata : array of floats, shape (n,) value at each point in the triangulation must be the same size of the mesh order : int (default=1) order of the interpolatory function used - 0 = nearest-neighbour - 1 = linear - 3 = cubic Returns ------- zi : float / array of floats, shape (l,) interpolated value(s) at (lons, lats) err : int / array of ints, shape (l,) whether interpolation (0), extrapolation (1) or error (other) """ rlons = np.radians(lons) rlats = np.radians(lats) return self.tri.interpolate(rlons, rlats, zdata, order, grad) def calculate_area_weights(self): """ Calculate pointwise weights and area Returns ------- area : array of floats, shape (n,) point-wise area weights : array of ints, shape(n,) weighting for each point Notes ----- This calls a fortran 90 routine which computes the weight and area for each point in the mesh using the geocentric radius of the sphere at the equator `r1` and the poles `r2` (defaults to Earth values: 6384.4km and 6352.8km, respectively). """ from quagmire._fortran import ntriw_s R = self._radius tri_area = self.tri.areas() # find surface area and weights on the unit sphere area, weight = ntriw_s(self.npoints, self.tri.simplices.T+1, tri_area) # project to the radius of the sphere area *= R**2 return area, weight def node_neighbours(self, point): """ Returns a list of neighbour nodes for a given point in the Delaunay triangulation. Parameters ---------- point : int point on the mesh Returns ------- neighbours : list list of neighbours that are connected by line segments to the specified `point` """ return self.natural_neighbours[point,1:self.natural_neighbours_count[point]] def derivatives(self, PHI, nit=10, tol=1e-8): """ Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.derivatives_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads def derivative_grad(self, PHI, nit=10, tol=1e-8): """ Compute gradients of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.gradient_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads # required for compatibility among FlatMesh types def _derivative_grad_cartesian(self, *args, **kwargs): u_x, u_y, u_z = self.tri.gradient_xyz(*args, **kwargs) grads = np.ndarray((u_x.size, 3)) grads[:, 0] = u_x grads[:, 1] = u_y grads[:, 2] = u_z return grads def derivative_div(self, PHIx, PHIy, PHIz, **kwargs): """ Compute second order derivative from flux fields PHIx, PHIy, PHIz We evaluate the gradient on these fields using the derivative-grad method. Arguments --------- PHIx : ndarray of floats, shape (n,) array of first partial derivatives in x direction PHIy : ndarray of floats, shape (n,) array of first partial derivatives in y direction PHIz : ndarray of floats, shape (n,) array of first partial derivatives in z direction kwargs : optional keyword-argument specifiers keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3 Returns ------- del2PHI : ndarray of floats, shape (n,) second derivative of PHI """ u_xx, u_xy, u_zz = self.tri.gradient_xyz(PHIx, **kwargs) u_yx, u_yy, u_zz = self.tri.gradient_xyz(PHIy, **kwargs) u_zx, u_zy, u_zz = self.tri.gradient_xyz(PHIz, **kwargs) return (u_xx + u_yy + u_zz)/self._radius def construct_natural_neighbour_cloud(self): """ Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup. """ # # maximum size of nn array node-by-node (it's almost certainly +1 because # # it is only +2 if the neighbours do not form a closed loop around the node) # max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2 # s = self.tri.simplices # nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int) # nnm = np.zeros((self.npoints, max_neighbours), dtype=bool) # nn = np.empty((self.npoints), dtype=int) # for node in range(0, self.npoints): # w = np.where(s==node) # l = np.unique(s[w[0]]) # # l = np.roll(l,-np.argwhere(l == node)[0]) # nn[node] = l.shape[0] # nns[node,0:nn[node]] = l # nnm[node,0:nn[node]] = True # self.natural_neighbours = nns # self.natural_neighbours_count = nn # self.natural_neighbours_mask = nnm # Find all the segments in the triangulation and sort the array # (note this differs from the stripy routine because is treates m-n and n-m as distinct) lst = self.tri.lst lend = self.tri.lend lptr = self.tri.lptr segments_array = np.empty((len(lptr),2),dtype=np.int) segments_array[:,0] = np.abs(lst[:]) - 1 segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1 valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0) segments = segments_array[valid,:] deshuffled = self.tri._deshuffle_simplices(segments) smsi = np.argsort(deshuffled[:,0]) sms = np.zeros_like(deshuffled) sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:] natural_neighbours_count = np.bincount(sms[:,0]) iend = np.cumsum(natural_neighbours_count) istart = np.zeros_like(iend) istart[1:] = iend[:-1] natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int) for j in range(0,self.npoints): natural_neighbours[j, 0] = j natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1] natural_neighbours_count += 1 self.natural_neighbours = natural_neighbours self.natural_neighbours_count = natural_neighbours_count self.natural_neighbours_mask = natural_neighbours != -1 return def construct_neighbour_cloud(self, size=25): """ Find neighbours from distance cKDTree. Parameters ---------- size : int Number of neighbours to search for Notes ----- Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large `size` is set to. """ nndist, nncloud = self.cKDTree.query(self.data, k=size) self.neighbour_cloud = nncloud # find the arc distances for n in range(1, size): nndist[:,n] = distance_on_sphere(self.data, self.data[nncloud[:,n]], self.radius) self.neighbour_cloud_distances = nndist neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints) self.near_neighbours = neighbours + 2 self.extended_neighbours = np.full_like(neighbours, size) def local_area_smoothing(self, data, its=1, centre_weight=0.75): """ Local area smoothing using radial-basis function smoothing kernel Parameters ---------- data : array of floats, shape (n,) field variable to be smoothed its : int number of iterations (default: 1) centre_weight : float weight to apply to centre nodes (default: 0.75) other nodes are weighted by (1 - `centre_weight`) Returns ------- sm : array of floats, shape (n,) smoothed field variable """ smooth_data = data.copy() smooth_data_old = data.copy() for i in range(0, its): smooth_data_old[:] = smooth_data smooth_data = centre_weight*smooth_data_old + \ (1.0 - centre_weight)*self.rbf_smoother(smooth_data) return smooth_data def _construct_rbf_weights(self, delta=None): if delta == None: delta = self.neighbour_cloud_distances[:, 1].mean() self.delta = delta self.gaussian_dist_w = self._rbf_weights(delta) return def _rbf_weights(self, delta=None): neighbour_cloud_distances = self.neighbour_cloud_distances if delta == None: delta = self.neighbour_cloud_distances[:, 1].mean() # Initialise the interpolants gaussian_dist_w = np.zeros_like(neighbour_cloud_distances) gaussian_dist_w[:,:] = np.exp(-np.power(neighbour_cloud_distances[:,:]/delta, 2.0)) gaussian_dist_w[:,:] /= gaussian_dist_w.sum(axis=1).reshape(-1,1) return gaussian_dist_w def rbf_smoother(self, vector, iterations=1, delta=None): """ Smoothing using a radial-basis function smoothing kernel Arguments --------- vector : array of floats, shape (n,) field variable to be smoothed iterations : int number of iterations to smooth vector delta : float / array of floats shape (n,) distance weights to apply the Gaussian interpolants Returns ------- smooth_vec : array of floats, shape (n,) smoothed version of input vector """ if type(delta) != type(None): self._construct_rbf_weights(delta) vector = self.sync(vector) for i in range(0, iterations): # print self.dm.comm.rank, ": RBF ",vector.max(), vector.min() vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1) vector = self.sync(vector_smoothed) return vector def build_rbf_smoother(self, delta, iterations=1): strimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: strimesh_self.lvec.setArray(strimesh_self.neighbour_cloud_distances[:, 1]) strimesh_self.dm.localToGlobal(strimesh_self.lvec, strimesh_self.gvec) delta = strimesh_self.gvec.sum() / strimesh_self.gvec.getSize() inner_self._mesh = strimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): import quagmire smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations) def build_rbf_smoother(self, delta, iterations=1): strimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: delta = strimesh_self.neighbour_cloud_distances[:, 1].mean() ## NOT SAFE IN PARALLEL !!! inner_self._mesh = strimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): import quagmire if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations)Ancestors
Instance variables
var coords-
Spherical coordinates of local mesh points in degrees lon / lat.
coordsis of shape (npoints, 2)Expand source code
@property def coords(self): """ Spherical coordinates of local mesh points in degrees lon / lat. `coords` is of shape (npoints, 2) """ return self._coords var data-
Cartesian coordinates of local mesh points.
datais of shape (npoints, 3)These points are projected to the radius of the sphere. If
self.radiusis set to 1.0 then these points are on the unit sphere and are identical toself.tri.points.Expand source code
@property def data(self): """ Cartesian coordinates of local mesh points. `data` is of shape (npoints, 3) These points are projected to the radius of the sphere. If `self.radius` is set to 1.0 then these points are on the unit sphere and are identical to `self.tri.points`. """ return self._data var id-
Expand source code
@property def id(self): return self.__id var radius-
The radius of the sphere.
Use
geocentric_radius()to compute the radius with distortion between the poles and the equator, otherwise Quagmire uses Earth values by default. i.e.radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3)Setting a new value of radius updates the point-wise area calculation.
Expand source code
@property def radius(self): """ The radius of the sphere. Use `geocentric_radius` to compute the radius with distortion between the poles and the equator, otherwise Quagmire uses Earth values by default. i.e. ``` radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3) ``` Setting a new value of radius updates the point-wise area calculation. """ return self._radius
Methods
def build_rbf_smoother(self, delta, iterations=1)-
Expand source code
def build_rbf_smoother(self, delta, iterations=1): strimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: delta = strimesh_self.neighbour_cloud_distances[:, 1].mean() ## NOT SAFE IN PARALLEL !!! inner_self._mesh = strimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): import quagmire if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations) def calculate_area_weights(self)-
Calculate pointwise weights and area
Returns
area:arrayoffloats, shape (n,)- point-wise area
weights:arrayofints, shape(n,)- weighting for each point
Notes
This calls a fortran 90 routine which computes the weight and area for each point in the mesh using the geocentric radius of the sphere at the equator
r1and the polesr2(defaults to Earth values: 6384.4km and 6352.8km, respectively).Expand source code
def calculate_area_weights(self): """ Calculate pointwise weights and area Returns ------- area : array of floats, shape (n,) point-wise area weights : array of ints, shape(n,) weighting for each point Notes ----- This calls a fortran 90 routine which computes the weight and area for each point in the mesh using the geocentric radius of the sphere at the equator `r1` and the poles `r2` (defaults to Earth values: 6384.4km and 6352.8km, respectively). """ from quagmire._fortran import ntriw_s R = self._radius tri_area = self.tri.areas() # find surface area and weights on the unit sphere area, weight = ntriw_s(self.npoints, self.tri.simplices.T+1, tri_area) # project to the radius of the sphere area *= R**2 return area, weight def construct_natural_neighbour_cloud(self)-
Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup.
Expand source code
def construct_natural_neighbour_cloud(self): """ Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup. """ # # maximum size of nn array node-by-node (it's almost certainly +1 because # # it is only +2 if the neighbours do not form a closed loop around the node) # max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2 # s = self.tri.simplices # nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int) # nnm = np.zeros((self.npoints, max_neighbours), dtype=bool) # nn = np.empty((self.npoints), dtype=int) # for node in range(0, self.npoints): # w = np.where(s==node) # l = np.unique(s[w[0]]) # # l = np.roll(l,-np.argwhere(l == node)[0]) # nn[node] = l.shape[0] # nns[node,0:nn[node]] = l # nnm[node,0:nn[node]] = True # self.natural_neighbours = nns # self.natural_neighbours_count = nn # self.natural_neighbours_mask = nnm # Find all the segments in the triangulation and sort the array # (note this differs from the stripy routine because is treates m-n and n-m as distinct) lst = self.tri.lst lend = self.tri.lend lptr = self.tri.lptr segments_array = np.empty((len(lptr),2),dtype=np.int) segments_array[:,0] = np.abs(lst[:]) - 1 segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1 valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0) segments = segments_array[valid,:] deshuffled = self.tri._deshuffle_simplices(segments) smsi = np.argsort(deshuffled[:,0]) sms = np.zeros_like(deshuffled) sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:] natural_neighbours_count = np.bincount(sms[:,0]) iend = np.cumsum(natural_neighbours_count) istart = np.zeros_like(iend) istart[1:] = iend[:-1] natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int) for j in range(0,self.npoints): natural_neighbours[j, 0] = j natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1] natural_neighbours_count += 1 self.natural_neighbours = natural_neighbours self.natural_neighbours_count = natural_neighbours_count self.natural_neighbours_mask = natural_neighbours != -1 return def construct_neighbour_cloud(self, size=25)-
Find neighbours from distance cKDTree.
Parameters
size:int- Number of neighbours to search for
Notes
Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large
sizeis set to.Expand source code
def construct_neighbour_cloud(self, size=25): """ Find neighbours from distance cKDTree. Parameters ---------- size : int Number of neighbours to search for Notes ----- Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large `size` is set to. """ nndist, nncloud = self.cKDTree.query(self.data, k=size) self.neighbour_cloud = nncloud # find the arc distances for n in range(1, size): nndist[:,n] = distance_on_sphere(self.data, self.data[nncloud[:,n]], self.radius) self.neighbour_cloud_distances = nndist neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints) self.near_neighbours = neighbours + 2 self.extended_neighbours = np.full_like(neighbours, size) def derivative_div(self, PHIx, PHIy, PHIz, **kwargs)-
Compute second order derivative from flux fields PHIx, PHIy, PHIz We evaluate the gradient on these fields using the derivative-grad method.
Arguments
PHIx:ndarrayoffloats, shape (n,)- array of first partial derivatives in x direction
PHIy:ndarrayoffloats, shape (n,)- array of first partial derivatives in y direction
PHIz:ndarrayoffloats, shape (n,)- array of first partial derivatives in z direction
kwargs:optional keyword-argument specifiers- keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3
Returns
del2PHI:ndarrayoffloats, shape (n,)- second derivative of PHI
Expand source code
def derivative_div(self, PHIx, PHIy, PHIz, **kwargs): """ Compute second order derivative from flux fields PHIx, PHIy, PHIz We evaluate the gradient on these fields using the derivative-grad method. Arguments --------- PHIx : ndarray of floats, shape (n,) array of first partial derivatives in x direction PHIy : ndarray of floats, shape (n,) array of first partial derivatives in y direction PHIz : ndarray of floats, shape (n,) array of first partial derivatives in z direction kwargs : optional keyword-argument specifiers keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3 Returns ------- del2PHI : ndarray of floats, shape (n,) second derivative of PHI """ u_xx, u_xy, u_zz = self.tri.gradient_xyz(PHIx, **kwargs) u_yx, u_yy, u_zz = self.tri.gradient_xyz(PHIy, **kwargs) u_zx, u_zy, u_zz = self.tri.gradient_xyz(PHIz, **kwargs) return (u_xx + u_yy + u_zz)/self._radius def derivative_grad(self, PHI, nit=10, tol=1e-08)-
Compute gradients of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
PHI:ndarrayoffloats, shape (n,)- compute the derivative of this array
nit:int optional (default: 10)- number of iterations to reach convergence
tol:float optional (default: 1e-8)- convergence is reached when this tolerance is met
Returns
PHIx:ndarrayoffloats, shape(n,)- first partial derivative of PHI in x direction
PHIy:ndarrayoffloats, shape(n,)- first partial derivative of PHI in y direction
Expand source code
def derivative_grad(self, PHI, nit=10, tol=1e-8): """ Compute gradients of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.gradient_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads def derivatives(self, PHI, nit=10, tol=1e-08)-
Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
PHI:ndarrayoffloats, shape (n,)- compute the derivative of this array
nit:int optional (default: 10)- number of iterations to reach convergence
tol:float optional (default: 1e-8)- convergence is reached when this tolerance is met
Returns
PHIx:ndarrayoffloats, shape(n,)- first partial derivative of PHI in x direction
PHIy:ndarrayoffloats, shape(n,)- first partial derivative of PHI in y direction
Expand source code
def derivatives(self, PHI, nit=10, tol=1e-8): """ Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.derivatives_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads def interpolate(self, lons, lats, zdata, order=1, grad=None)-
Base class to handle nearest neighbour, linear, and cubic interpolation. Given a triangulation of a set of nodes on the unit sphere, along with data values at the nodes, this method interpolates (or extrapolates) the value at a given longitude and latitude.
Parameters
lons:float / arrayoffloats, shape (l,)- longitudinal coordinate(s) on the sphere
lats:float / arrayoffloats, shape (l,)- latitudinal coordinate(s) on the sphere
zdata:arrayoffloats, shape (n,)- value at each point in the triangulation must be the same size of the mesh
order:int (default=1)- order of the interpolatory function used - 0 = nearest-neighbour - 1 = linear - 3 = cubic
Returns
zi:float / arrayoffloats, shape (l,)- interpolated value(s) at (lons, lats)
err:int / arrayofints, shape (l,)- whether interpolation (0), extrapolation (1) or error (other)
Expand source code
def interpolate(self, lons, lats, zdata, order=1, grad=None): """ Base class to handle nearest neighbour, linear, and cubic interpolation. Given a triangulation of a set of nodes on the unit sphere, along with data values at the nodes, this method interpolates (or extrapolates) the value at a given longitude and latitude. Parameters ---------- lons : float / array of floats, shape (l,) longitudinal coordinate(s) on the sphere lats : float / array of floats, shape (l,) latitudinal coordinate(s) on the sphere zdata : array of floats, shape (n,) value at each point in the triangulation must be the same size of the mesh order : int (default=1) order of the interpolatory function used - 0 = nearest-neighbour - 1 = linear - 3 = cubic Returns ------- zi : float / array of floats, shape (l,) interpolated value(s) at (lons, lats) err : int / array of ints, shape (l,) whether interpolation (0), extrapolation (1) or error (other) """ rlons = np.radians(lons) rlats = np.radians(lats) return self.tri.interpolate(rlons, rlats, zdata, order, grad) def local_area_smoothing(self, data, its=1, centre_weight=0.75)-
Local area smoothing using radial-basis function smoothing kernel
Parameters
data:arrayoffloats, shape (n,)- field variable to be smoothed
its:int- number of iterations (default: 1)
centre_weight:float- weight to apply to centre nodes (default: 0.75)
other nodes are weighted by (1 -
centre_weight)
Returns
sm:arrayoffloats, shape (n,)- smoothed field variable
Expand source code
def local_area_smoothing(self, data, its=1, centre_weight=0.75): """ Local area smoothing using radial-basis function smoothing kernel Parameters ---------- data : array of floats, shape (n,) field variable to be smoothed its : int number of iterations (default: 1) centre_weight : float weight to apply to centre nodes (default: 0.75) other nodes are weighted by (1 - `centre_weight`) Returns ------- sm : array of floats, shape (n,) smoothed field variable """ smooth_data = data.copy() smooth_data_old = data.copy() for i in range(0, its): smooth_data_old[:] = smooth_data smooth_data = centre_weight*smooth_data_old + \ (1.0 - centre_weight)*self.rbf_smoother(smooth_data) return smooth_data def node_neighbours(self, point)-
Returns a list of neighbour nodes for a given point in the Delaunay triangulation.
Parameters
point:int- point on the mesh
Returns
neighbours:list- list of neighbours that are connected by line segments
to the specified
point
Expand source code
def node_neighbours(self, point): """ Returns a list of neighbour nodes for a given point in the Delaunay triangulation. Parameters ---------- point : int point on the mesh Returns ------- neighbours : list list of neighbours that are connected by line segments to the specified `point` """ return self.natural_neighbours[point,1:self.natural_neighbours_count[point]] def rbf_smoother(self, vector, iterations=1, delta=None)-
Smoothing using a radial-basis function smoothing kernel
Arguments
vector:arrayoffloats, shape (n,)- field variable to be smoothed
iterations:int- number of iterations to smooth vector
delta:float / arrayoffloats shape (n,)- distance weights to apply the Gaussian interpolants
Returns
smooth_vec:arrayoffloats, shape (n,)- smoothed version of input vector
Expand source code
def rbf_smoother(self, vector, iterations=1, delta=None): """ Smoothing using a radial-basis function smoothing kernel Arguments --------- vector : array of floats, shape (n,) field variable to be smoothed iterations : int number of iterations to smooth vector delta : float / array of floats shape (n,) distance weights to apply the Gaussian interpolants Returns ------- smooth_vec : array of floats, shape (n,) smoothed version of input vector """ if type(delta) != type(None): self._construct_rbf_weights(delta) vector = self.sync(vector) for i in range(0, iterations): # print self.dm.comm.rank, ": RBF ",vector.max(), vector.min() vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1) vector = self.sync(vector_smoothed) return vector
Inherited members