Module quagmire.equation_systems.diffusion
Expand source code
# Copyright 2016-2020 Louis Moresi, Ben Mather, Romain Beucher
#
# This file is part of Quagmire.
#
# Quagmire is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or any later version.
#
# Quagmire is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Quagmire. If not, see <http://www.gnu.org/licenses/>.
from quagmire import function as _fn
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
# To do ... an interface for (iteratively) dealing with
# boundaries with normals that are not aligned to the coordinates
class DiffusionEquation(object):
## it is helpful to have a unique ID for each instance so we
## can autogenerate unique names if we wish
__count = 0
@classmethod
def _count(cls):
DiffusionEquation.__count += 1
return DiffusionEquation.__count
@property
def id(self):
return self.__id
def __init__(self,
mesh=None,
diffusivity_fn=None,
dirichlet_mask=None,
neumann_x_mask=None,
neumann_y_mask=None,
non_linear=False ):
self.__id = "diffusion_solver_{}".format(self._count())
self.mesh = mesh
# These we initialise to None first of all
self._diffusivity = None
self._neumann_x_mask = None
self._neumann_y_mask = None
self._dirichlet_mask = None
self._non_linear = False
if diffusivity_fn is not None:
self.diffusivity = diffusivity_fn
if neumann_x_mask is not None:
self.neumann_x_mask = neumann_x_mask
if neumann_y_mask is not None:
self.neumann_y_mask = neumann_y_mask
if dirichlet_mask is not None:
self.dirichlet_mask = dirichlet_mask
@property
def mesh(self):
return self._mesh
@mesh.setter
def mesh(self, meshobject):
self._mesh = meshobject
self._phi = meshobject.add_variable(name="phi_{}".format(self.id))
return
@property
def phi(self):
return self._phi
# No setter for this ... it is defined via the mesh.setter
@property
def diffusivity(self):
return self._diffusivity
@diffusivity.setter
def diffusivity(self, fn_object):
self._diffusivity = fn_object
if _fn.check_dependency(self._diffusivity, self._phi):
# Set the flag directly, bypassing .setter checks
self._non_linear = True
return
@property
def neumann_x_mask(self):
return self._neumann_x_mask
@neumann_x_mask.setter
def neumann_x_mask(self, fn_object):
_fn.check_object_is_a_q_function_and_raise(fn_object)
self._neumann_x_mask = fn_object
return
@property
def neumann_y_mask(self):
return self._neumann_y_mask
@neumann_y_mask.setter
def neumann_y_mask(self, fn_object):
_fn.check_object_is_a_q_function_and_raise(fn_object)
self._neumann_y_mask = fn_object
return
@property
def dirichlet_mask(self):
return self._dirichlet_mask
@dirichlet_mask.setter
def dirichlet_mask(self, fn_object):
_fn.check_object_is_a_q_function_and_raise(fn_object)
self._dirichlet_mask = fn_object
return
@property
def non_linear(self):
return self._non_linear
@non_linear.setter
def non_linear(self, bool_object):
# Warn if non-linearity exists
# This flag is set whenever the diffusivity function is changed
# and will need to be over-ridden each time.
self._non_linear = bool_object
if _fn.check_dependency(self._diffusivity, self._phi):
print("Over-riding non-linear solver path despite diffusivity being non-linear")
return
def verify(self):
"""Verify solver is ready for launch"""
# Check to see we have provided everything in the correct form
return
def diffusion_rate_fn(self, lazyFn):
## !! create stand alone function
from quagmire import function as fn
gradl = self._mesh.geometry.grad(lazyFn)
kappa_dx_fn = fn.misc.where(self.neumann_x_mask,
self.diffusivity * gradl[0],
fn.parameter(0.0))
kappa_dy_fn = fn.misc.where(self.neumann_y_mask,
self.diffusivity * gradl[1],
fn.parameter(0.0))
dPhi_dt_fn = fn.misc.where(self.dirichlet_mask, \
self._mesh.geometry.div(fn.vector_field(kappa_dx_fn, kappa_dy_fn)), \
fn.parameter(0.0))
return dPhi_dt_fn
def diffusion_timestep(self):
local_diff_timestep = (self._mesh.area / self._diffusivity.evaluate(self._mesh)).min()
# synchronise ...
local_diff_timestep = np.array(local_diff_timestep)
global_diff_timestep = np.array(0.0)
comm.Allreduce([local_diff_timestep, MPI.DOUBLE], [global_diff_timestep, MPI.DOUBLE], op=MPI.MIN)
return global_diff_timestep
def time_integration(self, timestep, steps=1, Delta_t=None, feedback=None):
from quagmire import function as fn
if Delta_t is not None:
steps = Delta_t // timestep
timestep = Delta_t / steps
elapsed_time = 0.0
for step in range(0, int(steps)):
## Non-linear loop will need to go here
## and update timestep somehow.
dPhi_dt_fn = self.diffusion_rate_fn(self.phi)
phi0 = self.phi.copy()
self.phi.data = self.phi.data + 0.5 * timestep * dPhi_dt_fn.evaluate(self._mesh)
self.phi.data = phi0.data + timestep * dPhi_dt_fn.evaluate(self._mesh)
elapsed_time += timestep
if feedback is not None and step%feedback == 0 or step == steps:
print("{:05d} - t = {:.3g}".format(step, elapsed_time))
return steps, elapsed_time
Classes
class DiffusionEquation (mesh=None, diffusivity_fn=None, dirichlet_mask=None, neumann_x_mask=None, neumann_y_mask=None, non_linear=False)
-
Expand source code
class DiffusionEquation(object): ## it is helpful to have a unique ID for each instance so we ## can autogenerate unique names if we wish __count = 0 @classmethod def _count(cls): DiffusionEquation.__count += 1 return DiffusionEquation.__count @property def id(self): return self.__id def __init__(self, mesh=None, diffusivity_fn=None, dirichlet_mask=None, neumann_x_mask=None, neumann_y_mask=None, non_linear=False ): self.__id = "diffusion_solver_{}".format(self._count()) self.mesh = mesh # These we initialise to None first of all self._diffusivity = None self._neumann_x_mask = None self._neumann_y_mask = None self._dirichlet_mask = None self._non_linear = False if diffusivity_fn is not None: self.diffusivity = diffusivity_fn if neumann_x_mask is not None: self.neumann_x_mask = neumann_x_mask if neumann_y_mask is not None: self.neumann_y_mask = neumann_y_mask if dirichlet_mask is not None: self.dirichlet_mask = dirichlet_mask @property def mesh(self): return self._mesh @mesh.setter def mesh(self, meshobject): self._mesh = meshobject self._phi = meshobject.add_variable(name="phi_{}".format(self.id)) return @property def phi(self): return self._phi # No setter for this ... it is defined via the mesh.setter @property def diffusivity(self): return self._diffusivity @diffusivity.setter def diffusivity(self, fn_object): self._diffusivity = fn_object if _fn.check_dependency(self._diffusivity, self._phi): # Set the flag directly, bypassing .setter checks self._non_linear = True return @property def neumann_x_mask(self): return self._neumann_x_mask @neumann_x_mask.setter def neumann_x_mask(self, fn_object): _fn.check_object_is_a_q_function_and_raise(fn_object) self._neumann_x_mask = fn_object return @property def neumann_y_mask(self): return self._neumann_y_mask @neumann_y_mask.setter def neumann_y_mask(self, fn_object): _fn.check_object_is_a_q_function_and_raise(fn_object) self._neumann_y_mask = fn_object return @property def dirichlet_mask(self): return self._dirichlet_mask @dirichlet_mask.setter def dirichlet_mask(self, fn_object): _fn.check_object_is_a_q_function_and_raise(fn_object) self._dirichlet_mask = fn_object return @property def non_linear(self): return self._non_linear @non_linear.setter def non_linear(self, bool_object): # Warn if non-linearity exists # This flag is set whenever the diffusivity function is changed # and will need to be over-ridden each time. self._non_linear = bool_object if _fn.check_dependency(self._diffusivity, self._phi): print("Over-riding non-linear solver path despite diffusivity being non-linear") return def verify(self): """Verify solver is ready for launch""" # Check to see we have provided everything in the correct form return def diffusion_rate_fn(self, lazyFn): ## !! create stand alone function from quagmire import function as fn gradl = self._mesh.geometry.grad(lazyFn) kappa_dx_fn = fn.misc.where(self.neumann_x_mask, self.diffusivity * gradl[0], fn.parameter(0.0)) kappa_dy_fn = fn.misc.where(self.neumann_y_mask, self.diffusivity * gradl[1], fn.parameter(0.0)) dPhi_dt_fn = fn.misc.where(self.dirichlet_mask, \ self._mesh.geometry.div(fn.vector_field(kappa_dx_fn, kappa_dy_fn)), \ fn.parameter(0.0)) return dPhi_dt_fn def diffusion_timestep(self): local_diff_timestep = (self._mesh.area / self._diffusivity.evaluate(self._mesh)).min() # synchronise ... local_diff_timestep = np.array(local_diff_timestep) global_diff_timestep = np.array(0.0) comm.Allreduce([local_diff_timestep, MPI.DOUBLE], [global_diff_timestep, MPI.DOUBLE], op=MPI.MIN) return global_diff_timestep def time_integration(self, timestep, steps=1, Delta_t=None, feedback=None): from quagmire import function as fn if Delta_t is not None: steps = Delta_t // timestep timestep = Delta_t / steps elapsed_time = 0.0 for step in range(0, int(steps)): ## Non-linear loop will need to go here ## and update timestep somehow. dPhi_dt_fn = self.diffusion_rate_fn(self.phi) phi0 = self.phi.copy() self.phi.data = self.phi.data + 0.5 * timestep * dPhi_dt_fn.evaluate(self._mesh) self.phi.data = phi0.data + timestep * dPhi_dt_fn.evaluate(self._mesh) elapsed_time += timestep if feedback is not None and step%feedback == 0 or step == steps: print("{:05d} - t = {:.3g}".format(step, elapsed_time)) return steps, elapsed_time
Instance variables
var diffusivity
-
Expand source code
@property def diffusivity(self): return self._diffusivity
var dirichlet_mask
-
Expand source code
@property def dirichlet_mask(self): return self._dirichlet_mask
var id
-
Expand source code
@property def id(self): return self.__id
var mesh
-
Expand source code
@property def mesh(self): return self._mesh
var neumann_x_mask
-
Expand source code
@property def neumann_x_mask(self): return self._neumann_x_mask
var neumann_y_mask
-
Expand source code
@property def neumann_y_mask(self): return self._neumann_y_mask
var non_linear
-
Expand source code
@property def non_linear(self): return self._non_linear
var phi
-
Expand source code
@property def phi(self): return self._phi
Methods
def diffusion_rate_fn(self, lazyFn)
-
Expand source code
def diffusion_rate_fn(self, lazyFn): ## !! create stand alone function from quagmire import function as fn gradl = self._mesh.geometry.grad(lazyFn) kappa_dx_fn = fn.misc.where(self.neumann_x_mask, self.diffusivity * gradl[0], fn.parameter(0.0)) kappa_dy_fn = fn.misc.where(self.neumann_y_mask, self.diffusivity * gradl[1], fn.parameter(0.0)) dPhi_dt_fn = fn.misc.where(self.dirichlet_mask, \ self._mesh.geometry.div(fn.vector_field(kappa_dx_fn, kappa_dy_fn)), \ fn.parameter(0.0)) return dPhi_dt_fn
def diffusion_timestep(self)
-
Expand source code
def diffusion_timestep(self): local_diff_timestep = (self._mesh.area / self._diffusivity.evaluate(self._mesh)).min() # synchronise ... local_diff_timestep = np.array(local_diff_timestep) global_diff_timestep = np.array(0.0) comm.Allreduce([local_diff_timestep, MPI.DOUBLE], [global_diff_timestep, MPI.DOUBLE], op=MPI.MIN) return global_diff_timestep
def time_integration(self, timestep, steps=1, Delta_t=None, feedback=None)
-
Expand source code
def time_integration(self, timestep, steps=1, Delta_t=None, feedback=None): from quagmire import function as fn if Delta_t is not None: steps = Delta_t // timestep timestep = Delta_t / steps elapsed_time = 0.0 for step in range(0, int(steps)): ## Non-linear loop will need to go here ## and update timestep somehow. dPhi_dt_fn = self.diffusion_rate_fn(self.phi) phi0 = self.phi.copy() self.phi.data = self.phi.data + 0.5 * timestep * dPhi_dt_fn.evaluate(self._mesh) self.phi.data = phi0.data + timestep * dPhi_dt_fn.evaluate(self._mesh) elapsed_time += timestep if feedback is not None and step%feedback == 0 or step == steps: print("{:05d} - t = {:.3g}".format(step, elapsed_time)) return steps, elapsed_time
def verify(self)
-
Verify solver is ready for launch
Expand source code
def verify(self): """Verify solver is ready for launch""" # Check to see we have provided everything in the correct form return