Module quagmire.mesh.strimesh
For unstructured data on the sphere.
sTriMesh
implements the following functionality:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Supply a PETSc DM
object (created from quagmire.tools.meshtools
) to initialise the object.
Expand source code
# Copyright 2016-2020 Louis Moresi, Ben Mather, Romain Beucher
#
# This file is part of Quagmire.
#
# Quagmire is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or any later version.
#
# Quagmire is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Quagmire. If not, see <http://www.gnu.org/licenses/>.
"""
For unstructured data on the sphere.
<img src="https://raw.githubusercontent.com/underworldcode/quagmire/dev/docs/images/quagmire-flowchart-strimesh.png" style="width: 321px; float:right">
`sTriMesh` implements the following functionality:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Supply a `PETSc DM` object (created from `quagmire.tools.meshtools`) to initialise the object.
"""
import numpy as np
from mpi4py import MPI
import sys,petsc4py
petsc4py.init(sys.argv)
from petsc4py import PETSc
# comm = MPI.COMM_WORLD
from time import perf_counter
from .commonmesh import CommonMesh as _CommonMesh
from scipy.spatial import cKDTree as _cKDTree
try: range = xrange
except: pass
from quagmire import function as fn
from quagmire.function import LazyEvaluation as _LazyEvaluation
class sTriMesh(_CommonMesh):
"""
Build spatial data structures on an __unstructured spherical mesh__.
Use `sTriMesh` for:
- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Each of these data structures are built on top of a `PETSc DM` object
(created from `quagmire.tools.meshtools`).
Parameters
----------
DM : PETSc DMPlex object
Build this unstructured spherical mesh object using one of:
- `quagmire.tools.meshtools.create_spherical_DMPlex`
- `quagmire.tools.meshtools.create_DMPlex_from_spherical_points`
verbose : bool
Flag toggles verbose output
*args : optional arguments
**kwargs : optional keyword arguments
Attributes
----------
tri : stripy Triangulation object
Cartesian mesh object generated by `stripy`
npoints : int
Number of points (n) in the mesh
pointwise_area : Quagmire MeshVariable
`quagmire.mesh.basemesh.MeshVariable` of point-wise area
mask : Quagmire MeshVariable
`quagmire.mesh.basemesh.MeshVariable` to denote points on the boundary
data : array of floats, shape (n,2)
Cartesian mesh coordinates in x,y directions
coords : array of floats, shape (n,2)
Same as `data`
neighbour_cloud : array of ints, shape(n,25)
array of nearest node neighbours by distance
near_neighbour_mask : array of bools, shape(n,25)
mask immediate node neighbours in `neighbour_cloud`
timings : dict
Timing information for each Quagmire routine
"""
__count = 0
@classmethod
def _count(cls):
sTriMesh.__count += 1
return sTriMesh.__count
@property
def id(self):
return self.__id
def __init__(self, dm, r1=6384.4e3, r2=6352.8e3, verbose=True, *args, **kwargs):
import stripy
# initialise base mesh class
super(sTriMesh, self).__init__(dm, verbose)
self.__id = "strimesh_{}".format(self._count())
# Delaunay triangulation
t = perf_counter()
coords = dm.getCoordinatesLocal().array.reshape(-1,3)
minX, minY, minZ = coords.min(axis=0)
maxX, maxY, maxZ = coords.max(axis=0)
length_scale = np.sqrt((maxX - minX)*(maxY - minY)/coords.shape[0])
# coords += np.random.random(coords.shape) * 0.0001 * length_scale # This should be aware of the point spacing (small perturbation)
# r = np.sqrt(coords[:,0]**2 + coords[:,1]**2 + coords[:,2]**2) # should just equal 1
# r = 1.0
# lons = np.arctan2(coords[:,1], coords[:,0])
# lats = np.arcsin(coords[:,2]/r)
rlons, rlats = stripy.spherical.xyz2lonlat(coords[:,0], coords[:,1], coords[:,2])
lons = np.degrees(rlons)
lats = np.degrees(rlats)
self.tri = stripy.sTriangulation(rlons, rlats, permute=True)
self.npoints = self.tri.npoints
self.timings['triangulation'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Delaunay triangulation {}s".format(self.dm.comm.rank, perf_counter()-t))
# Calculate geocentric radius
self._radius = geocentric_radius(lats, r1, r2)
self._coords = np.c_[lons, lats]
self._data = self.tri.points*self._radius.reshape(-1,1)
# Calculate weigths and pointwise area
t = perf_counter()
self.area, self.weight = self.calculate_area_weights()
self.pointwise_area = self.add_variable(name="area")
self.pointwise_area.data = self.area
self.pointwise_area.lock()
self.timings['area weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Calculate node weights and area {}s".format(self.dm.comm.rank, perf_counter()-t))
# Find boundary points
t = perf_counter()
self.bmask = self.get_boundary()
self.mask = self.add_variable(name="Mask")
self.mask.data = self.bmask.astype(PETSc.ScalarType)
self.mask.lock()
self.timings['find boundaries'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Find boundaries {}s".format(self.dm.comm.rank, perf_counter()-t))
# cKDTree
t = perf_counter()
self.cKDTree = _cKDTree(self.data, balanced_tree=False)
self.timings['cKDTree'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - cKDTree {}s".format(self.dm.comm.rank, perf_counter()-t))
# Find neighbours
t = perf_counter()
self.construct_neighbour_cloud()
self.timings['construct neighbour cloud'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
t1 = perf_counter()
self.construct_natural_neighbour_cloud()
self.timings['construct natural neighbour cloud'] = [perf_counter()-t1, self.log.getCPUTime(), self.log.getFlops()]
if self.rank==0 and self.verbose:
print("{} - Construct neighbour cloud arrays {}s, ({}s + {}s)".format(self.dm.comm.rank, perf_counter()-t,
self.timings['construct neighbour cloud'][0],
self.timings['construct natural neighbour cloud'][0] ))
# RBF smoothing operator
t = perf_counter()
self._construct_rbf_weights()
self.timings['construct rbf weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()]
if self.rank == 0 and self.verbose:
print("{} - Construct rbf weights {}s".format(self.dm.comm.rank, perf_counter()-t))
self.root = False
# We place these properties here to prevent users from mucking up the coordinates.
# Radius is the only property to have a setter only because freedom in the vertical axis
# doesn't mess up the graph, and is quite useful for visualisation in lavavu / paraview.
## FUTURE: deforming the mesh should be via a context manager.
@property
def data(self):
"""
Cartesian coordinates of local mesh points.
`data` is of shape (npoints, 3)
These points are projected to the radius of the sphere.
If `self.radius` is set to 1.0 then these points are on the unit sphere and are
identical to `self.tri.points`.
"""
return self._data
@property
def coords(self):
"""
Spherical coordinates of local mesh points in degrees lon / lat.
`coords` is of shape (npoints, 2)
"""
return self._coords
@property
def radius(self):
"""
The radius of the sphere.
Use `geocentric_radius` to compute the radius with distortion between the poles
and the equator, otherwise Quagmire uses Earth values by default. i.e.
```
radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3)
```
Setting a new value of radius updates the point-wise area calculation.
"""
return self._radius
@radius.setter
def radius(self, value):
self._radius = value
# update mesh coordinates and rebuild k-d tree
self._data = self.tri.points*np.array(self._radius).reshape(-1,1)
self._update_DM_coordinates()
self.cKDTree = _cKDTree(self.data, balanced_tree=False)
self.construct_neighbour_cloud()
# re-evalutate mesh area
self.area, self.weight = self.calculate_area_weights()
self.pointwise_area.unlock()
self.pointwise_area.data = self.area
self.pointwise_area.lock()
def _update_DM_coordinates(self):
""" Update the coordinates on the DM """
# get global coordinate vector from DM
global_coord_vec = self.dm.getCoordinates()
global_coord_vec.setArray(self.data.ravel())
self.dm.setCoordinates(global_coord_vec)
def interpolate(self, lons, lats, zdata, order=1, grad=None):
"""
Base class to handle nearest neighbour, linear, and cubic interpolation.
Given a triangulation of a set of nodes on the unit sphere, along with data
values at the nodes, this method interpolates (or extrapolates) the value
at a given longitude and latitude.
Parameters
----------
lons : float / array of floats, shape (l,)
longitudinal coordinate(s) on the sphere
lats : float / array of floats, shape (l,)
latitudinal coordinate(s) on the sphere
zdata : array of floats, shape (n,)
value at each point in the triangulation
must be the same size of the mesh
order : int (default=1)
order of the interpolatory function used
- 0 = nearest-neighbour
- 1 = linear
- 3 = cubic
Returns
-------
zi : float / array of floats, shape (l,)
interpolated value(s) at (lons, lats)
err : int / array of ints, shape (l,)
whether interpolation (0), extrapolation (1) or error (other)
"""
rlons = np.radians(lons)
rlats = np.radians(lats)
return self.tri.interpolate(rlons, rlats, zdata, order, grad)
def calculate_area_weights(self):
"""
Calculate pointwise weights and area
Returns
-------
area : array of floats, shape (n,)
point-wise area
weights : array of ints, shape(n,)
weighting for each point
Notes
-----
This calls a fortran 90 routine which computes the weight and area
for each point in the mesh using the geocentric radius of the sphere
at the equator `r1` and the poles `r2` (defaults to Earth values:
6384.4km and 6352.8km, respectively).
"""
from quagmire._fortran import ntriw_s
R = self._radius
tri_area = self.tri.areas()
# find surface area and weights on the unit sphere
area, weight = ntriw_s(self.npoints, self.tri.simplices.T+1, tri_area)
# project to the radius of the sphere
area *= R**2
return area, weight
def node_neighbours(self, point):
"""
Returns a list of neighbour nodes for a given point in the
Delaunay triangulation.
Parameters
----------
point : int
point on the mesh
Returns
-------
neighbours : list
list of neighbours that are connected by line segments
to the specified `point`
"""
return self.natural_neighbours[point,1:self.natural_neighbours_count[point]]
def derivatives(self, PHI, nit=10, tol=1e-8):
"""
Compute derivatives of PHI in the x, y directions.
This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
---------
PHI : ndarray of floats, shape (n,)
compute the derivative of this array
nit : int optional (default: 10)
number of iterations to reach convergence
tol : float optional (default: 1e-8)
convergence is reached when this tolerance is met
Returns
-------
PHIx : ndarray of floats, shape(n,)
first partial derivative of PHI in x direction
PHIy : ndarray of floats, shape(n,)
first partial derivative of PHI in y direction
"""
PHIx, PHIy = self.tri.derivatives_lonlat(PHI, nit, tol)
grads = np.ndarray((PHIx.size, 2))
grads[:, 0] = np.radians(PHIx)
grads[:, 1] = np.radians(PHIy)
return grads
def derivative_grad(self, PHI, nit=10, tol=1e-8):
"""
Compute gradients of PHI in the x, y directions.
This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
---------
PHI : ndarray of floats, shape (n,)
compute the derivative of this array
nit : int optional (default: 10)
number of iterations to reach convergence
tol : float optional (default: 1e-8)
convergence is reached when this tolerance is met
Returns
-------
PHIx : ndarray of floats, shape(n,)
first partial derivative of PHI in x direction
PHIy : ndarray of floats, shape(n,)
first partial derivative of PHI in y direction
"""
PHIx, PHIy = self.tri.gradient_lonlat(PHI, nit, tol)
grads = np.ndarray((PHIx.size, 2))
grads[:, 0] = np.radians(PHIx)
grads[:, 1] = np.radians(PHIy)
return grads
# required for compatibility among FlatMesh types
def _derivative_grad_cartesian(self, *args, **kwargs):
u_x, u_y, u_z = self.tri.gradient_xyz(*args, **kwargs)
grads = np.ndarray((u_x.size, 3))
grads[:, 0] = u_x
grads[:, 1] = u_y
grads[:, 2] = u_z
return grads
def derivative_div(self, PHIx, PHIy, PHIz, **kwargs):
"""
Compute second order derivative from flux fields PHIx, PHIy, PHIz
We evaluate the gradient on these fields using the derivative-grad method.
Arguments
---------
PHIx : ndarray of floats, shape (n,)
array of first partial derivatives in x direction
PHIy : ndarray of floats, shape (n,)
array of first partial derivatives in y direction
PHIz : ndarray of floats, shape (n,)
array of first partial derivatives in z direction
kwargs : optional keyword-argument specifiers
keyword arguments to be passed onto derivative_grad
e.g. nit=5, tol=1e-3
Returns
-------
del2PHI : ndarray of floats, shape (n,)
second derivative of PHI
"""
u_xx, u_xy, u_zz = self.tri.gradient_xyz(PHIx, **kwargs)
u_yx, u_yy, u_zz = self.tri.gradient_xyz(PHIy, **kwargs)
u_zx, u_zy, u_zz = self.tri.gradient_xyz(PHIz, **kwargs)
return (u_xx + u_yy + u_zz)/self._radius
def construct_natural_neighbour_cloud(self):
"""
Find the natural neighbours for each node in the triangulation and store in a
numpy array for efficient lookup.
"""
# # maximum size of nn array node-by-node (it's almost certainly +1 because
# # it is only +2 if the neighbours do not form a closed loop around the node)
# max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2
# s = self.tri.simplices
# nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int)
# nnm = np.zeros((self.npoints, max_neighbours), dtype=bool)
# nn = np.empty((self.npoints), dtype=int)
# for node in range(0, self.npoints):
# w = np.where(s==node)
# l = np.unique(s[w[0]])
# # l = np.roll(l,-np.argwhere(l == node)[0])
# nn[node] = l.shape[0]
# nns[node,0:nn[node]] = l
# nnm[node,0:nn[node]] = True
# self.natural_neighbours = nns
# self.natural_neighbours_count = nn
# self.natural_neighbours_mask = nnm
# Find all the segments in the triangulation and sort the array
# (note this differs from the stripy routine because is treates m-n and n-m as distinct)
lst = self.tri.lst
lend = self.tri.lend
lptr = self.tri.lptr
segments_array = np.empty((len(lptr),2),dtype=np.int)
segments_array[:,0] = np.abs(lst[:]) - 1
segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1
valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0)
segments = segments_array[valid,:]
deshuffled = self.tri._deshuffle_simplices(segments)
smsi = np.argsort(deshuffled[:,0])
sms = np.zeros_like(deshuffled)
sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:]
natural_neighbours_count = np.bincount(sms[:,0])
iend = np.cumsum(natural_neighbours_count)
istart = np.zeros_like(iend)
istart[1:] = iend[:-1]
natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int)
for j in range(0,self.npoints):
natural_neighbours[j, 0] = j
natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1]
natural_neighbours_count += 1
self.natural_neighbours = natural_neighbours
self.natural_neighbours_count = natural_neighbours_count
self.natural_neighbours_mask = natural_neighbours != -1
return
def construct_neighbour_cloud(self, size=25):
"""
Find neighbours from distance cKDTree.
Parameters
----------
size : int
Number of neighbours to search for
Notes
-----
Use this method to search for neighbours that are not
necessarily immediate node neighbours (i.e. neighbours
connected by a line segment). Extended node neighbours
should be captured by the search depending on how large
`size` is set to.
"""
nndist, nncloud = self.cKDTree.query(self.data, k=size)
self.neighbour_cloud = nncloud
# find the arc distances
for n in range(1, size):
nndist[:,n] = distance_on_sphere(self.data, self.data[nncloud[:,n]], self.radius)
self.neighbour_cloud_distances = nndist
neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints)
self.near_neighbours = neighbours + 2
self.extended_neighbours = np.full_like(neighbours, size)
def local_area_smoothing(self, data, its=1, centre_weight=0.75):
"""
Local area smoothing using radial-basis function smoothing kernel
Parameters
----------
data : array of floats, shape (n,)
field variable to be smoothed
its : int
number of iterations (default: 1)
centre_weight : float
weight to apply to centre nodes (default: 0.75)
other nodes are weighted by (1 - `centre_weight`)
Returns
-------
sm : array of floats, shape (n,)
smoothed field variable
"""
smooth_data = data.copy()
smooth_data_old = data.copy()
for i in range(0, its):
smooth_data_old[:] = smooth_data
smooth_data = centre_weight*smooth_data_old + \
(1.0 - centre_weight)*self.rbf_smoother(smooth_data)
return smooth_data
def _construct_rbf_weights(self, delta=None):
if delta == None:
delta = self.neighbour_cloud_distances[:, 1].mean()
self.delta = delta
self.gaussian_dist_w = self._rbf_weights(delta)
return
def _rbf_weights(self, delta=None):
neighbour_cloud_distances = self.neighbour_cloud_distances
if delta == None:
delta = self.neighbour_cloud_distances[:, 1].mean()
# Initialise the interpolants
gaussian_dist_w = np.zeros_like(neighbour_cloud_distances)
gaussian_dist_w[:,:] = np.exp(-np.power(neighbour_cloud_distances[:,:]/delta, 2.0))
gaussian_dist_w[:,:] /= gaussian_dist_w.sum(axis=1).reshape(-1,1)
return gaussian_dist_w
def rbf_smoother(self, vector, iterations=1, delta=None):
"""
Smoothing using a radial-basis function smoothing kernel
Arguments
---------
vector : array of floats, shape (n,)
field variable to be smoothed
iterations : int
number of iterations to smooth vector
delta : float / array of floats shape (n,)
distance weights to apply the Gaussian interpolants
Returns
-------
smooth_vec : array of floats, shape (n,)
smoothed version of input vector
"""
if type(delta) != type(None):
self._construct_rbf_weights(delta)
vector = self.sync(vector)
for i in range(0, iterations):
# print self.dm.comm.rank, ": RBF ",vector.max(), vector.min()
vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1)
vector = self.sync(vector_smoothed)
return vector
def build_rbf_smoother(self, delta, iterations=1):
strimesh_self = self
class _rbf_smoother_object(object):
def __init__(inner_self, delta, iterations=1):
if delta == None:
strimesh_self.lvec.setArray(strimesh_self.neighbour_cloud_distances[:, 1])
strimesh_self.dm.localToGlobal(strimesh_self.lvec, strimesh_self.gvec)
delta = strimesh_self.gvec.sum() / strimesh_self.gvec.getSize()
inner_self._mesh = strimesh_self
inner_self.delta = delta
inner_self.iterations = iterations
inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta)
return
def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1):
import quagmire
smooth_node_values = lazyFn.evaluate(inner_self._mesh)
for i in range(0, iterations):
smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1)
smooth_node_values = inner_self._mesh.sync(smooth_node_values)
return smooth_node_values
def smooth_fn(inner_self, meshVar, iterations=None):
if iterations is None:
iterations = inner_self.iterations
def smoother_fn(*args, **kwargs):
import quagmire
smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations)
if len(args) == 1 and args[0] == meshVar._mesh:
return smooth_node_values
elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]):
mesh = args[0]
return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs)
else:
xi = np.atleast_1d(args[0])
yi = np.atleast_1d(args[1])
i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs)
return i
newLazyFn = _LazyEvaluation()
newLazyFn.evaluate = smoother_fn
newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations)
return newLazyFn
return _rbf_smoother_object(delta, iterations)
def build_rbf_smoother(self, delta, iterations=1):
strimesh_self = self
class _rbf_smoother_object(object):
def __init__(inner_self, delta, iterations=1):
if delta == None:
delta = strimesh_self.neighbour_cloud_distances[:, 1].mean() ## NOT SAFE IN PARALLEL !!!
inner_self._mesh = strimesh_self
inner_self.delta = delta
inner_self.iterations = iterations
inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta)
return
def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1):
import quagmire
smooth_node_values = lazyFn.evaluate(inner_self._mesh)
for i in range(0, iterations):
smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1)
smooth_node_values = inner_self._mesh.sync(smooth_node_values)
return smooth_node_values
def smooth_fn(inner_self, meshVar, iterations=None):
import quagmire
if iterations is None:
iterations = inner_self.iterations
def smoother_fn(*args, **kwargs):
smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations)
if len(args) == 1 and args[0] == meshVar._mesh:
return smooth_node_values
elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]):
mesh = args[0]
return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs)
else:
xi = np.atleast_1d(args[0])
yi = np.atleast_1d(args[1])
i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs)
return i
newLazyFn = _LazyEvaluation()
newLazyFn.evaluate = smoother_fn
newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations)
return newLazyFn
return _rbf_smoother_object(delta, iterations)
def geocentric_radius(lat, r1=6384.4e3, r2=6352.8e3):
"""
Calculate the radius of an oblate spheroid (like the earth)
Parameters
----------
lat : array of floats
latitudinal coordinates in degrees
r1 : float
radius at the equator (in metres)
r2 : float
radius at the poles (in metres)
Returns
-------
r : array of floats
radius at provided latitudes `lat` in metres
"""
rlat = np.radians(lat)
coslat = np.cos(rlat)
sinlat = np.sin(rlat)
num = (r1**2*coslat)**2 + (r2**2*sinlat)**2
den = (r1*coslat)**2 + (r2*sinlat)**2
return np.sqrt(num/den)
def distance_on_sphere(A, B, radius=None):
"""
Find the distance on the sphere between two sets of points (A and B)
If the radius of the sphere is not provided, then the radius is
estimated from A.
Parameters
----------
A : ndarray
array of points with axis 0 or 1 being the same as B
B : ndarray
array of points with axis 0 or 1 being the same as A
radius : float or ndarray
radius of the sphere
Returns
-------
distance : ndarray
array of spherical distances the same shape as A or B
depending on which is larger
"""
A = np.atleast_2d(A)
B = np.atleast_2d(B)
norm_cross = np.linalg.norm(np.cross(A, B), axis=1)
dot = (A*B).sum(axis=1)
if radius is None:
radius = np.linalg.norm(A, axis=1)
return np.arctan(norm_cross/dot) * radius
Functions
def distance_on_sphere(A, B, radius=None)
-
Find the distance on the sphere between two sets of points (A and B) If the radius of the sphere is not provided, then the radius is estimated from A.
Parameters
A
:ndarray
- array of points with axis 0 or 1 being the same as B
B
:ndarray
- array of points with axis 0 or 1 being the same as A
radius
:float
orndarray
- radius of the sphere
Returns
distance
:ndarray
- array of spherical distances the same shape as A or B depending on which is larger
Expand source code
def distance_on_sphere(A, B, radius=None): """ Find the distance on the sphere between two sets of points (A and B) If the radius of the sphere is not provided, then the radius is estimated from A. Parameters ---------- A : ndarray array of points with axis 0 or 1 being the same as B B : ndarray array of points with axis 0 or 1 being the same as A radius : float or ndarray radius of the sphere Returns ------- distance : ndarray array of spherical distances the same shape as A or B depending on which is larger """ A = np.atleast_2d(A) B = np.atleast_2d(B) norm_cross = np.linalg.norm(np.cross(A, B), axis=1) dot = (A*B).sum(axis=1) if radius is None: radius = np.linalg.norm(A, axis=1) return np.arctan(norm_cross/dot) * radius
def geocentric_radius(lat, r1=6384400.0, r2=6352800.0)
-
Calculate the radius of an oblate spheroid (like the earth)
Parameters
lat
:array
offloats
- latitudinal coordinates in degrees
r1
:float
- radius at the equator (in metres)
r2
:float
- radius at the poles (in metres)
Returns
r
:array
offloats
- radius at provided latitudes
lat
in metres
Expand source code
def geocentric_radius(lat, r1=6384.4e3, r2=6352.8e3): """ Calculate the radius of an oblate spheroid (like the earth) Parameters ---------- lat : array of floats latitudinal coordinates in degrees r1 : float radius at the equator (in metres) r2 : float radius at the poles (in metres) Returns ------- r : array of floats radius at provided latitudes `lat` in metres """ rlat = np.radians(lat) coslat = np.cos(rlat) sinlat = np.sin(rlat) num = (r1**2*coslat)**2 + (r2**2*sinlat)**2 den = (r1*coslat)**2 + (r2*sinlat)**2 return np.sqrt(num/den)
Classes
class sTriMesh (dm, r1=6384400.0, r2=6352800.0, verbose=True, *args, **kwargs)
-
Build spatial data structures on an unstructured spherical mesh.
Use
sTriMesh
for:- calculating spatial derivatives
- identifying node neighbour relationships
- interpolation / extrapolation
- smoothing operators
- importing and saving mesh information
Each of these data structures are built on top of a
PETSc DM
object (created fromquagmire.tools.meshtools
).Parameters
DM
:PETSc DMPlex object
-
Build this unstructured spherical mesh object using one of:
verbose
:bool
- Flag toggles verbose output
*args
:optional arguments
**kwargs
:optional keyword arguments
Attributes
tri
:stripy Triangulation object
- Cartesian mesh object generated by
stripy
npoints
:int
- Number of points (n) in the mesh
pointwise_area
:Quagmire MeshVariable
MeshVariable
of point-wise areamask
:Quagmire MeshVariable
MeshVariable
to denote points on the boundarydata
:array
offloats, shape (n,2)
- Cartesian mesh coordinates in x,y directions
coords
:array
offloats, shape (n,2)
- Same as
data
neighbour_cloud
:array
ofints, shape(n,25)
- array of nearest node neighbours by distance
near_neighbour_mask
:array
ofbools, shape(n,25)
- mask immediate node neighbours in
neighbour_cloud
timings
:dict
- Timing information for each Quagmire routine
Expand source code
class sTriMesh(_CommonMesh): """ Build spatial data structures on an __unstructured spherical mesh__. Use `sTriMesh` for: - calculating spatial derivatives - identifying node neighbour relationships - interpolation / extrapolation - smoothing operators - importing and saving mesh information Each of these data structures are built on top of a `PETSc DM` object (created from `quagmire.tools.meshtools`). Parameters ---------- DM : PETSc DMPlex object Build this unstructured spherical mesh object using one of: - `quagmire.tools.meshtools.create_spherical_DMPlex` - `quagmire.tools.meshtools.create_DMPlex_from_spherical_points` verbose : bool Flag toggles verbose output *args : optional arguments **kwargs : optional keyword arguments Attributes ---------- tri : stripy Triangulation object Cartesian mesh object generated by `stripy` npoints : int Number of points (n) in the mesh pointwise_area : Quagmire MeshVariable `quagmire.mesh.basemesh.MeshVariable` of point-wise area mask : Quagmire MeshVariable `quagmire.mesh.basemesh.MeshVariable` to denote points on the boundary data : array of floats, shape (n,2) Cartesian mesh coordinates in x,y directions coords : array of floats, shape (n,2) Same as `data` neighbour_cloud : array of ints, shape(n,25) array of nearest node neighbours by distance near_neighbour_mask : array of bools, shape(n,25) mask immediate node neighbours in `neighbour_cloud` timings : dict Timing information for each Quagmire routine """ __count = 0 @classmethod def _count(cls): sTriMesh.__count += 1 return sTriMesh.__count @property def id(self): return self.__id def __init__(self, dm, r1=6384.4e3, r2=6352.8e3, verbose=True, *args, **kwargs): import stripy # initialise base mesh class super(sTriMesh, self).__init__(dm, verbose) self.__id = "strimesh_{}".format(self._count()) # Delaunay triangulation t = perf_counter() coords = dm.getCoordinatesLocal().array.reshape(-1,3) minX, minY, minZ = coords.min(axis=0) maxX, maxY, maxZ = coords.max(axis=0) length_scale = np.sqrt((maxX - minX)*(maxY - minY)/coords.shape[0]) # coords += np.random.random(coords.shape) * 0.0001 * length_scale # This should be aware of the point spacing (small perturbation) # r = np.sqrt(coords[:,0]**2 + coords[:,1]**2 + coords[:,2]**2) # should just equal 1 # r = 1.0 # lons = np.arctan2(coords[:,1], coords[:,0]) # lats = np.arcsin(coords[:,2]/r) rlons, rlats = stripy.spherical.xyz2lonlat(coords[:,0], coords[:,1], coords[:,2]) lons = np.degrees(rlons) lats = np.degrees(rlats) self.tri = stripy.sTriangulation(rlons, rlats, permute=True) self.npoints = self.tri.npoints self.timings['triangulation'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Delaunay triangulation {}s".format(self.dm.comm.rank, perf_counter()-t)) # Calculate geocentric radius self._radius = geocentric_radius(lats, r1, r2) self._coords = np.c_[lons, lats] self._data = self.tri.points*self._radius.reshape(-1,1) # Calculate weigths and pointwise area t = perf_counter() self.area, self.weight = self.calculate_area_weights() self.pointwise_area = self.add_variable(name="area") self.pointwise_area.data = self.area self.pointwise_area.lock() self.timings['area weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Calculate node weights and area {}s".format(self.dm.comm.rank, perf_counter()-t)) # Find boundary points t = perf_counter() self.bmask = self.get_boundary() self.mask = self.add_variable(name="Mask") self.mask.data = self.bmask.astype(PETSc.ScalarType) self.mask.lock() self.timings['find boundaries'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Find boundaries {}s".format(self.dm.comm.rank, perf_counter()-t)) # cKDTree t = perf_counter() self.cKDTree = _cKDTree(self.data, balanced_tree=False) self.timings['cKDTree'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - cKDTree {}s".format(self.dm.comm.rank, perf_counter()-t)) # Find neighbours t = perf_counter() self.construct_neighbour_cloud() self.timings['construct neighbour cloud'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] t1 = perf_counter() self.construct_natural_neighbour_cloud() self.timings['construct natural neighbour cloud'] = [perf_counter()-t1, self.log.getCPUTime(), self.log.getFlops()] if self.rank==0 and self.verbose: print("{} - Construct neighbour cloud arrays {}s, ({}s + {}s)".format(self.dm.comm.rank, perf_counter()-t, self.timings['construct neighbour cloud'][0], self.timings['construct natural neighbour cloud'][0] )) # RBF smoothing operator t = perf_counter() self._construct_rbf_weights() self.timings['construct rbf weights'] = [perf_counter()-t, self.log.getCPUTime(), self.log.getFlops()] if self.rank == 0 and self.verbose: print("{} - Construct rbf weights {}s".format(self.dm.comm.rank, perf_counter()-t)) self.root = False # We place these properties here to prevent users from mucking up the coordinates. # Radius is the only property to have a setter only because freedom in the vertical axis # doesn't mess up the graph, and is quite useful for visualisation in lavavu / paraview. ## FUTURE: deforming the mesh should be via a context manager. @property def data(self): """ Cartesian coordinates of local mesh points. `data` is of shape (npoints, 3) These points are projected to the radius of the sphere. If `self.radius` is set to 1.0 then these points are on the unit sphere and are identical to `self.tri.points`. """ return self._data @property def coords(self): """ Spherical coordinates of local mesh points in degrees lon / lat. `coords` is of shape (npoints, 2) """ return self._coords @property def radius(self): """ The radius of the sphere. Use `geocentric_radius` to compute the radius with distortion between the poles and the equator, otherwise Quagmire uses Earth values by default. i.e. ``` radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3) ``` Setting a new value of radius updates the point-wise area calculation. """ return self._radius @radius.setter def radius(self, value): self._radius = value # update mesh coordinates and rebuild k-d tree self._data = self.tri.points*np.array(self._radius).reshape(-1,1) self._update_DM_coordinates() self.cKDTree = _cKDTree(self.data, balanced_tree=False) self.construct_neighbour_cloud() # re-evalutate mesh area self.area, self.weight = self.calculate_area_weights() self.pointwise_area.unlock() self.pointwise_area.data = self.area self.pointwise_area.lock() def _update_DM_coordinates(self): """ Update the coordinates on the DM """ # get global coordinate vector from DM global_coord_vec = self.dm.getCoordinates() global_coord_vec.setArray(self.data.ravel()) self.dm.setCoordinates(global_coord_vec) def interpolate(self, lons, lats, zdata, order=1, grad=None): """ Base class to handle nearest neighbour, linear, and cubic interpolation. Given a triangulation of a set of nodes on the unit sphere, along with data values at the nodes, this method interpolates (or extrapolates) the value at a given longitude and latitude. Parameters ---------- lons : float / array of floats, shape (l,) longitudinal coordinate(s) on the sphere lats : float / array of floats, shape (l,) latitudinal coordinate(s) on the sphere zdata : array of floats, shape (n,) value at each point in the triangulation must be the same size of the mesh order : int (default=1) order of the interpolatory function used - 0 = nearest-neighbour - 1 = linear - 3 = cubic Returns ------- zi : float / array of floats, shape (l,) interpolated value(s) at (lons, lats) err : int / array of ints, shape (l,) whether interpolation (0), extrapolation (1) or error (other) """ rlons = np.radians(lons) rlats = np.radians(lats) return self.tri.interpolate(rlons, rlats, zdata, order, grad) def calculate_area_weights(self): """ Calculate pointwise weights and area Returns ------- area : array of floats, shape (n,) point-wise area weights : array of ints, shape(n,) weighting for each point Notes ----- This calls a fortran 90 routine which computes the weight and area for each point in the mesh using the geocentric radius of the sphere at the equator `r1` and the poles `r2` (defaults to Earth values: 6384.4km and 6352.8km, respectively). """ from quagmire._fortran import ntriw_s R = self._radius tri_area = self.tri.areas() # find surface area and weights on the unit sphere area, weight = ntriw_s(self.npoints, self.tri.simplices.T+1, tri_area) # project to the radius of the sphere area *= R**2 return area, weight def node_neighbours(self, point): """ Returns a list of neighbour nodes for a given point in the Delaunay triangulation. Parameters ---------- point : int point on the mesh Returns ------- neighbours : list list of neighbours that are connected by line segments to the specified `point` """ return self.natural_neighbours[point,1:self.natural_neighbours_count[point]] def derivatives(self, PHI, nit=10, tol=1e-8): """ Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.derivatives_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads def derivative_grad(self, PHI, nit=10, tol=1e-8): """ Compute gradients of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.gradient_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads # required for compatibility among FlatMesh types def _derivative_grad_cartesian(self, *args, **kwargs): u_x, u_y, u_z = self.tri.gradient_xyz(*args, **kwargs) grads = np.ndarray((u_x.size, 3)) grads[:, 0] = u_x grads[:, 1] = u_y grads[:, 2] = u_z return grads def derivative_div(self, PHIx, PHIy, PHIz, **kwargs): """ Compute second order derivative from flux fields PHIx, PHIy, PHIz We evaluate the gradient on these fields using the derivative-grad method. Arguments --------- PHIx : ndarray of floats, shape (n,) array of first partial derivatives in x direction PHIy : ndarray of floats, shape (n,) array of first partial derivatives in y direction PHIz : ndarray of floats, shape (n,) array of first partial derivatives in z direction kwargs : optional keyword-argument specifiers keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3 Returns ------- del2PHI : ndarray of floats, shape (n,) second derivative of PHI """ u_xx, u_xy, u_zz = self.tri.gradient_xyz(PHIx, **kwargs) u_yx, u_yy, u_zz = self.tri.gradient_xyz(PHIy, **kwargs) u_zx, u_zy, u_zz = self.tri.gradient_xyz(PHIz, **kwargs) return (u_xx + u_yy + u_zz)/self._radius def construct_natural_neighbour_cloud(self): """ Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup. """ # # maximum size of nn array node-by-node (it's almost certainly +1 because # # it is only +2 if the neighbours do not form a closed loop around the node) # max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2 # s = self.tri.simplices # nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int) # nnm = np.zeros((self.npoints, max_neighbours), dtype=bool) # nn = np.empty((self.npoints), dtype=int) # for node in range(0, self.npoints): # w = np.where(s==node) # l = np.unique(s[w[0]]) # # l = np.roll(l,-np.argwhere(l == node)[0]) # nn[node] = l.shape[0] # nns[node,0:nn[node]] = l # nnm[node,0:nn[node]] = True # self.natural_neighbours = nns # self.natural_neighbours_count = nn # self.natural_neighbours_mask = nnm # Find all the segments in the triangulation and sort the array # (note this differs from the stripy routine because is treates m-n and n-m as distinct) lst = self.tri.lst lend = self.tri.lend lptr = self.tri.lptr segments_array = np.empty((len(lptr),2),dtype=np.int) segments_array[:,0] = np.abs(lst[:]) - 1 segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1 valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0) segments = segments_array[valid,:] deshuffled = self.tri._deshuffle_simplices(segments) smsi = np.argsort(deshuffled[:,0]) sms = np.zeros_like(deshuffled) sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:] natural_neighbours_count = np.bincount(sms[:,0]) iend = np.cumsum(natural_neighbours_count) istart = np.zeros_like(iend) istart[1:] = iend[:-1] natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int) for j in range(0,self.npoints): natural_neighbours[j, 0] = j natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1] natural_neighbours_count += 1 self.natural_neighbours = natural_neighbours self.natural_neighbours_count = natural_neighbours_count self.natural_neighbours_mask = natural_neighbours != -1 return def construct_neighbour_cloud(self, size=25): """ Find neighbours from distance cKDTree. Parameters ---------- size : int Number of neighbours to search for Notes ----- Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large `size` is set to. """ nndist, nncloud = self.cKDTree.query(self.data, k=size) self.neighbour_cloud = nncloud # find the arc distances for n in range(1, size): nndist[:,n] = distance_on_sphere(self.data, self.data[nncloud[:,n]], self.radius) self.neighbour_cloud_distances = nndist neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints) self.near_neighbours = neighbours + 2 self.extended_neighbours = np.full_like(neighbours, size) def local_area_smoothing(self, data, its=1, centre_weight=0.75): """ Local area smoothing using radial-basis function smoothing kernel Parameters ---------- data : array of floats, shape (n,) field variable to be smoothed its : int number of iterations (default: 1) centre_weight : float weight to apply to centre nodes (default: 0.75) other nodes are weighted by (1 - `centre_weight`) Returns ------- sm : array of floats, shape (n,) smoothed field variable """ smooth_data = data.copy() smooth_data_old = data.copy() for i in range(0, its): smooth_data_old[:] = smooth_data smooth_data = centre_weight*smooth_data_old + \ (1.0 - centre_weight)*self.rbf_smoother(smooth_data) return smooth_data def _construct_rbf_weights(self, delta=None): if delta == None: delta = self.neighbour_cloud_distances[:, 1].mean() self.delta = delta self.gaussian_dist_w = self._rbf_weights(delta) return def _rbf_weights(self, delta=None): neighbour_cloud_distances = self.neighbour_cloud_distances if delta == None: delta = self.neighbour_cloud_distances[:, 1].mean() # Initialise the interpolants gaussian_dist_w = np.zeros_like(neighbour_cloud_distances) gaussian_dist_w[:,:] = np.exp(-np.power(neighbour_cloud_distances[:,:]/delta, 2.0)) gaussian_dist_w[:,:] /= gaussian_dist_w.sum(axis=1).reshape(-1,1) return gaussian_dist_w def rbf_smoother(self, vector, iterations=1, delta=None): """ Smoothing using a radial-basis function smoothing kernel Arguments --------- vector : array of floats, shape (n,) field variable to be smoothed iterations : int number of iterations to smooth vector delta : float / array of floats shape (n,) distance weights to apply the Gaussian interpolants Returns ------- smooth_vec : array of floats, shape (n,) smoothed version of input vector """ if type(delta) != type(None): self._construct_rbf_weights(delta) vector = self.sync(vector) for i in range(0, iterations): # print self.dm.comm.rank, ": RBF ",vector.max(), vector.min() vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1) vector = self.sync(vector_smoothed) return vector def build_rbf_smoother(self, delta, iterations=1): strimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: strimesh_self.lvec.setArray(strimesh_self.neighbour_cloud_distances[:, 1]) strimesh_self.dm.localToGlobal(strimesh_self.lvec, strimesh_self.gvec) delta = strimesh_self.gvec.sum() / strimesh_self.gvec.getSize() inner_self._mesh = strimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): import quagmire smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations) def build_rbf_smoother(self, delta, iterations=1): strimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: delta = strimesh_self.neighbour_cloud_distances[:, 1].mean() ## NOT SAFE IN PARALLEL !!! inner_self._mesh = strimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): import quagmire if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations)
Ancestors
Instance variables
var coords
-
Spherical coordinates of local mesh points in degrees lon / lat.
coords
is of shape (npoints, 2)Expand source code
@property def coords(self): """ Spherical coordinates of local mesh points in degrees lon / lat. `coords` is of shape (npoints, 2) """ return self._coords
var data
-
Cartesian coordinates of local mesh points.
data
is of shape (npoints, 3)These points are projected to the radius of the sphere. If
self.radius
is set to 1.0 then these points are on the unit sphere and are identical toself.tri.points
.Expand source code
@property def data(self): """ Cartesian coordinates of local mesh points. `data` is of shape (npoints, 3) These points are projected to the radius of the sphere. If `self.radius` is set to 1.0 then these points are on the unit sphere and are identical to `self.tri.points`. """ return self._data
var id
-
Expand source code
@property def id(self): return self.__id
var radius
-
The radius of the sphere.
Use
geocentric_radius()
to compute the radius with distortion between the poles and the equator, otherwise Quagmire uses Earth values by default. i.e.radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3)
Setting a new value of radius updates the point-wise area calculation.
Expand source code
@property def radius(self): """ The radius of the sphere. Use `geocentric_radius` to compute the radius with distortion between the poles and the equator, otherwise Quagmire uses Earth values by default. i.e. ``` radius = geocentric_radius(r1=6384.4e3, r2=6352.8e3) ``` Setting a new value of radius updates the point-wise area calculation. """ return self._radius
Methods
def build_rbf_smoother(self, delta, iterations=1)
-
Expand source code
def build_rbf_smoother(self, delta, iterations=1): strimesh_self = self class _rbf_smoother_object(object): def __init__(inner_self, delta, iterations=1): if delta == None: delta = strimesh_self.neighbour_cloud_distances[:, 1].mean() ## NOT SAFE IN PARALLEL !!! inner_self._mesh = strimesh_self inner_self.delta = delta inner_self.iterations = iterations inner_self.gaussian_dist_w = inner_self._mesh._rbf_weights(delta) return def _apply_rbf_on_my_mesh(inner_self, lazyFn, iterations=1): import quagmire smooth_node_values = lazyFn.evaluate(inner_self._mesh) for i in range(0, iterations): smooth_node_values = (smooth_node_values[inner_self._mesh.neighbour_cloud[:,:]] * inner_self.gaussian_dist_w[:,:]).sum(axis=1) smooth_node_values = inner_self._mesh.sync(smooth_node_values) return smooth_node_values def smooth_fn(inner_self, meshVar, iterations=None): import quagmire if iterations is None: iterations = inner_self.iterations def smoother_fn(*args, **kwargs): smooth_node_values = inner_self._apply_rbf_on_my_mesh(meshVar, iterations=iterations) if len(args) == 1 and args[0] == meshVar._mesh: return smooth_node_values elif len(args) == 1 and quagmire.mesh.check_object_is_a_q_mesh(args[0]): mesh = args[0] return inner_self._mesh.interpolate(meshVar._mesh.coords[:,0], meshVar._mesh.coords[:,1], zdata=smooth_node_values, **kwargs) else: xi = np.atleast_1d(args[0]) yi = np.atleast_1d(args[1]) i, e = inner_self._mesh.interpolate(xi, yi, zdata=smooth_node_values, **kwargs) return i newLazyFn = _LazyEvaluation() newLazyFn.evaluate = smoother_fn newLazyFn.description = "RBFsmooth({}, d={}, i={})".format(meshVar.description, inner_self.delta, iterations) return newLazyFn return _rbf_smoother_object(delta, iterations)
def calculate_area_weights(self)
-
Calculate pointwise weights and area
Returns
area
:array
offloats, shape (n,)
- point-wise area
weights
:array
ofints, shape(n,)
- weighting for each point
Notes
This calls a fortran 90 routine which computes the weight and area for each point in the mesh using the geocentric radius of the sphere at the equator
r1
and the polesr2
(defaults to Earth values: 6384.4km and 6352.8km, respectively).Expand source code
def calculate_area_weights(self): """ Calculate pointwise weights and area Returns ------- area : array of floats, shape (n,) point-wise area weights : array of ints, shape(n,) weighting for each point Notes ----- This calls a fortran 90 routine which computes the weight and area for each point in the mesh using the geocentric radius of the sphere at the equator `r1` and the poles `r2` (defaults to Earth values: 6384.4km and 6352.8km, respectively). """ from quagmire._fortran import ntriw_s R = self._radius tri_area = self.tri.areas() # find surface area and weights on the unit sphere area, weight = ntriw_s(self.npoints, self.tri.simplices.T+1, tri_area) # project to the radius of the sphere area *= R**2 return area, weight
def construct_natural_neighbour_cloud(self)
-
Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup.
Expand source code
def construct_natural_neighbour_cloud(self): """ Find the natural neighbours for each node in the triangulation and store in a numpy array for efficient lookup. """ # # maximum size of nn array node-by-node (it's almost certainly +1 because # # it is only +2 if the neighbours do not form a closed loop around the node) # max_neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints).max() + 2 # s = self.tri.simplices # nns = -1 * np.ones((self.npoints, max_neighbours), dtype=int) # nnm = np.zeros((self.npoints, max_neighbours), dtype=bool) # nn = np.empty((self.npoints), dtype=int) # for node in range(0, self.npoints): # w = np.where(s==node) # l = np.unique(s[w[0]]) # # l = np.roll(l,-np.argwhere(l == node)[0]) # nn[node] = l.shape[0] # nns[node,0:nn[node]] = l # nnm[node,0:nn[node]] = True # self.natural_neighbours = nns # self.natural_neighbours_count = nn # self.natural_neighbours_mask = nnm # Find all the segments in the triangulation and sort the array # (note this differs from the stripy routine because is treates m-n and n-m as distinct) lst = self.tri.lst lend = self.tri.lend lptr = self.tri.lptr segments_array = np.empty((len(lptr),2),dtype=np.int) segments_array[:,0] = np.abs(lst[:]) - 1 segments_array[:,1] = np.abs(lst[lptr[:]-1]) - 1 valid = np.logical_and(segments_array[:,0] >= 0, segments_array[:,1] >= 0) segments = segments_array[valid,:] deshuffled = self.tri._deshuffle_simplices(segments) smsi = np.argsort(deshuffled[:,0]) sms = np.zeros_like(deshuffled) sms[np.indices((deshuffled.shape[0],)),:] = deshuffled[smsi,:] natural_neighbours_count = np.bincount(sms[:,0]) iend = np.cumsum(natural_neighbours_count) istart = np.zeros_like(iend) istart[1:] = iend[:-1] natural_neighbours = np.full((self.npoints, natural_neighbours_count.max()+1), -1, dtype=np.int) for j in range(0,self.npoints): natural_neighbours[j, 0] = j natural_neighbours[j, 1:natural_neighbours_count[j]+1] = sms[istart[j]:istart[j]+natural_neighbours_count[j],1] natural_neighbours_count += 1 self.natural_neighbours = natural_neighbours self.natural_neighbours_count = natural_neighbours_count self.natural_neighbours_mask = natural_neighbours != -1 return
def construct_neighbour_cloud(self, size=25)
-
Find neighbours from distance cKDTree.
Parameters
size
:int
- Number of neighbours to search for
Notes
Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large
size
is set to.Expand source code
def construct_neighbour_cloud(self, size=25): """ Find neighbours from distance cKDTree. Parameters ---------- size : int Number of neighbours to search for Notes ----- Use this method to search for neighbours that are not necessarily immediate node neighbours (i.e. neighbours connected by a line segment). Extended node neighbours should be captured by the search depending on how large `size` is set to. """ nndist, nncloud = self.cKDTree.query(self.data, k=size) self.neighbour_cloud = nncloud # find the arc distances for n in range(1, size): nndist[:,n] = distance_on_sphere(self.data, self.data[nncloud[:,n]], self.radius) self.neighbour_cloud_distances = nndist neighbours = np.bincount(self.tri.simplices.ravel(), minlength=self.npoints) self.near_neighbours = neighbours + 2 self.extended_neighbours = np.full_like(neighbours, size)
def derivative_div(self, PHIx, PHIy, PHIz, **kwargs)
-
Compute second order derivative from flux fields PHIx, PHIy, PHIz We evaluate the gradient on these fields using the derivative-grad method.
Arguments
PHIx
:ndarray
offloats, shape (n,)
- array of first partial derivatives in x direction
PHIy
:ndarray
offloats, shape (n,)
- array of first partial derivatives in y direction
PHIz
:ndarray
offloats, shape (n,)
- array of first partial derivatives in z direction
kwargs
:optional keyword-argument specifiers
- keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3
Returns
del2PHI
:ndarray
offloats, shape (n,)
- second derivative of PHI
Expand source code
def derivative_div(self, PHIx, PHIy, PHIz, **kwargs): """ Compute second order derivative from flux fields PHIx, PHIy, PHIz We evaluate the gradient on these fields using the derivative-grad method. Arguments --------- PHIx : ndarray of floats, shape (n,) array of first partial derivatives in x direction PHIy : ndarray of floats, shape (n,) array of first partial derivatives in y direction PHIz : ndarray of floats, shape (n,) array of first partial derivatives in z direction kwargs : optional keyword-argument specifiers keyword arguments to be passed onto derivative_grad e.g. nit=5, tol=1e-3 Returns ------- del2PHI : ndarray of floats, shape (n,) second derivative of PHI """ u_xx, u_xy, u_zz = self.tri.gradient_xyz(PHIx, **kwargs) u_yx, u_yy, u_zz = self.tri.gradient_xyz(PHIy, **kwargs) u_zx, u_zy, u_zz = self.tri.gradient_xyz(PHIz, **kwargs) return (u_xx + u_yy + u_zz)/self._radius
def derivative_grad(self, PHI, nit=10, tol=1e-08)
-
Compute gradients of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
PHI
:ndarray
offloats, shape (n,)
- compute the derivative of this array
nit
:int optional (default: 10)
- number of iterations to reach convergence
tol
:float optional (default: 1e-8)
- convergence is reached when this tolerance is met
Returns
PHIx
:ndarray
offloats, shape(n,)
- first partial derivative of PHI in x direction
PHIy
:ndarray
offloats, shape(n,)
- first partial derivative of PHI in y direction
Expand source code
def derivative_grad(self, PHI, nit=10, tol=1e-8): """ Compute gradients of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.gradient_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads
def derivatives(self, PHI, nit=10, tol=1e-08)
-
Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function.
Arguments
PHI
:ndarray
offloats, shape (n,)
- compute the derivative of this array
nit
:int optional (default: 10)
- number of iterations to reach convergence
tol
:float optional (default: 1e-8)
- convergence is reached when this tolerance is met
Returns
PHIx
:ndarray
offloats, shape(n,)
- first partial derivative of PHI in x direction
PHIy
:ndarray
offloats, shape(n,)
- first partial derivative of PHI in y direction
Expand source code
def derivatives(self, PHI, nit=10, tol=1e-8): """ Compute derivatives of PHI in the x, y directions. This routine uses SRFPACK to compute derivatives on a C-1 bivariate function. Arguments --------- PHI : ndarray of floats, shape (n,) compute the derivative of this array nit : int optional (default: 10) number of iterations to reach convergence tol : float optional (default: 1e-8) convergence is reached when this tolerance is met Returns ------- PHIx : ndarray of floats, shape(n,) first partial derivative of PHI in x direction PHIy : ndarray of floats, shape(n,) first partial derivative of PHI in y direction """ PHIx, PHIy = self.tri.derivatives_lonlat(PHI, nit, tol) grads = np.ndarray((PHIx.size, 2)) grads[:, 0] = np.radians(PHIx) grads[:, 1] = np.radians(PHIy) return grads
def interpolate(self, lons, lats, zdata, order=1, grad=None)
-
Base class to handle nearest neighbour, linear, and cubic interpolation. Given a triangulation of a set of nodes on the unit sphere, along with data values at the nodes, this method interpolates (or extrapolates) the value at a given longitude and latitude.
Parameters
lons
:float / array
offloats, shape (l,)
- longitudinal coordinate(s) on the sphere
lats
:float / array
offloats, shape (l,)
- latitudinal coordinate(s) on the sphere
zdata
:array
offloats, shape (n,)
- value at each point in the triangulation must be the same size of the mesh
order
:int (default=1)
- order of the interpolatory function used - 0 = nearest-neighbour - 1 = linear - 3 = cubic
Returns
zi
:float / array
offloats, shape (l,)
- interpolated value(s) at (lons, lats)
err
:int / array
ofints, shape (l,)
- whether interpolation (0), extrapolation (1) or error (other)
Expand source code
def interpolate(self, lons, lats, zdata, order=1, grad=None): """ Base class to handle nearest neighbour, linear, and cubic interpolation. Given a triangulation of a set of nodes on the unit sphere, along with data values at the nodes, this method interpolates (or extrapolates) the value at a given longitude and latitude. Parameters ---------- lons : float / array of floats, shape (l,) longitudinal coordinate(s) on the sphere lats : float / array of floats, shape (l,) latitudinal coordinate(s) on the sphere zdata : array of floats, shape (n,) value at each point in the triangulation must be the same size of the mesh order : int (default=1) order of the interpolatory function used - 0 = nearest-neighbour - 1 = linear - 3 = cubic Returns ------- zi : float / array of floats, shape (l,) interpolated value(s) at (lons, lats) err : int / array of ints, shape (l,) whether interpolation (0), extrapolation (1) or error (other) """ rlons = np.radians(lons) rlats = np.radians(lats) return self.tri.interpolate(rlons, rlats, zdata, order, grad)
def local_area_smoothing(self, data, its=1, centre_weight=0.75)
-
Local area smoothing using radial-basis function smoothing kernel
Parameters
data
:array
offloats, shape (n,)
- field variable to be smoothed
its
:int
- number of iterations (default: 1)
centre_weight
:float
- weight to apply to centre nodes (default: 0.75)
other nodes are weighted by (1 -
centre_weight
)
Returns
sm
:array
offloats, shape (n,)
- smoothed field variable
Expand source code
def local_area_smoothing(self, data, its=1, centre_weight=0.75): """ Local area smoothing using radial-basis function smoothing kernel Parameters ---------- data : array of floats, shape (n,) field variable to be smoothed its : int number of iterations (default: 1) centre_weight : float weight to apply to centre nodes (default: 0.75) other nodes are weighted by (1 - `centre_weight`) Returns ------- sm : array of floats, shape (n,) smoothed field variable """ smooth_data = data.copy() smooth_data_old = data.copy() for i in range(0, its): smooth_data_old[:] = smooth_data smooth_data = centre_weight*smooth_data_old + \ (1.0 - centre_weight)*self.rbf_smoother(smooth_data) return smooth_data
def node_neighbours(self, point)
-
Returns a list of neighbour nodes for a given point in the Delaunay triangulation.
Parameters
point
:int
- point on the mesh
Returns
neighbours
:list
- list of neighbours that are connected by line segments
to the specified
point
Expand source code
def node_neighbours(self, point): """ Returns a list of neighbour nodes for a given point in the Delaunay triangulation. Parameters ---------- point : int point on the mesh Returns ------- neighbours : list list of neighbours that are connected by line segments to the specified `point` """ return self.natural_neighbours[point,1:self.natural_neighbours_count[point]]
def rbf_smoother(self, vector, iterations=1, delta=None)
-
Smoothing using a radial-basis function smoothing kernel
Arguments
vector
:array
offloats, shape (n,)
- field variable to be smoothed
iterations
:int
- number of iterations to smooth vector
delta
:float / array
offloats shape (n,)
- distance weights to apply the Gaussian interpolants
Returns
smooth_vec
:array
offloats, shape (n,)
- smoothed version of input vector
Expand source code
def rbf_smoother(self, vector, iterations=1, delta=None): """ Smoothing using a radial-basis function smoothing kernel Arguments --------- vector : array of floats, shape (n,) field variable to be smoothed iterations : int number of iterations to smooth vector delta : float / array of floats shape (n,) distance weights to apply the Gaussian interpolants Returns ------- smooth_vec : array of floats, shape (n,) smoothed version of input vector """ if type(delta) != type(None): self._construct_rbf_weights(delta) vector = self.sync(vector) for i in range(0, iterations): # print self.dm.comm.rank, ": RBF ",vector.max(), vector.min() vector_smoothed = (vector[self.neighbour_cloud[:,:]] * self.gaussian_dist_w[:,:]).sum(axis=1) vector = self.sync(vector_smoothed) return vector
Inherited members