Source code for pygimli.solver.solverFiniteVolume

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Simple Finite Volume Solver."""

import numpy as np
import pygimli as pg

from pygimli.solver import identity  # , parseArgToArray, parseArgToBoundaries

def cellDataToBoundaryData(mesh, v):
    """TODO Documentme."""
    if len(v) != mesh.cellCount():
        raise Exception("len(v) != mesh.cellCount():", len(v),
    return np.array([cellToFace(b, v) for b in mesh.boundaries()])

def boundaryNormals(mesh):
    """Collect all boundary outer normal vectors."""
    # implement with    return mesh.boundaryNorms()
    gB = np.zeros((mesh.boundaryCount(), 3))

    for b in mesh.boundaries():
        gB[] = b.norm()

    return gB

def _boundaryToCellDistances(mesh):
    """TODO Documentme."""
    d = [_boundaryToCellDistancesBound(b) for b in mesh.boundaries()]
    return np.array(d)

def _boundaryToCellDistancesBound(b):
    """TODO Documentme."""
    leftCell = b.leftCell()
    rightCell = b.rightCell()
    df1 = 0.
    df2 = 0.
    if leftCell:
        df1 =
    if rightCell:
        df2 =
    return df1 + df2

def cellDataToBoundaryGrad(mesh, v, vGrad):
    """TODO Documentme."""
    if len(v) != mesh.cellCount() or len(vGrad) != mesh.cellCount():
        raise BaseException("len(v) dismatch mesh.cellCount()")

    gB = mesh.cellDataToBoundaryGradient(v, vGrad)
    return np.vstack([pg.x(gB), pg.y(gB), pg.z(gB)]).T

    # gB = np.zeros((mesh.boundaryCount(), 3))
    # for b in mesh.boundaries():
    #     leftCell = b.leftCell()
    #     rightCell = b.rightCell()
    #     gr = pg.RVector3(0.0, 0.0, 0.0)
    #     t = (b.node(1).pos() - b.node(0).pos()).norm()
    #     if leftCell and rightCell:
    #         df1 =
    #         df2 =
    #         gr = b.norm() * \
    #             (v[] - v[]) / (df1 + df2)
    #         grL = t *[])
    #         grR = t *[])
    #         gr += (grL + grR) * 0.5
    #     elif leftCell:
    #         gr = t *[])
    #     gB[, 0] = gr[0]
    #     gB[, 1] = gr[1]
    #     gB[, 2] = gr[2]
    # return gB

def cellToFace(boundary, vec, harmonic=False):

    Interpolate cell to face values by weighted arithmetic/harmonic mean.
    leftCell = boundary.leftCell()
    rightCell = boundary.rightCell()
    df1 = 0.
    df2 = 0.
    u1 = 0.
    u2 = 0.
    if leftCell:
        df1 =
        u1 = vec[]
    if rightCell:
        df2 =
        u2 = vec[]
    uFace = 0.0
    d12 = (df1 + df2)

    if leftCell and rightCell:
        if harmonic:
            # harmonic mean
            uFace = (u1 * u2) / ((u2 - u1) * df2 / d12 + u1)
            # arithmetic mean
            # check left vs. right
            uFace = (u1 - u2) * df2 / d12 + u2

    elif leftCell:
        uFace = u1  # / df1
    elif rightCell:
        uFace = u2  # / df2
    return uFace

def cellToFaceArithmetic(boundary, AMM):
    """TODO Documentme."""
    leftCell = boundary.leftCell()
    rightCell = boundary.rightCell()
    df1 = 0.
    df2 = 0.
    harmonic = False
    if leftCell:
        df1 =
    if rightCell:
        df2 =
    d12 = (df1 + df2)

    if leftCell and rightCell:
        if harmonic:
            # harmonic mean
            # uFace = (u1 * u2) / ((u2-u1)*df2/d12 + u1)
            # arithmetic mean
            # check left vs. right
            AMM.addVal(,, df2 / d12)
            AMM.addVal(,, -df2 / d12 + 1.0)
            # uFace = (u1 - u2) * df2/d12 + u2

    elif leftCell:
        AMM.addVal(,, 1.0)
    elif rightCell:
        AMM.addVal(,, 1.0)

def cellDataToBoundaryDataMatrix(mesh):
    """TODO Documentme."""
    AMM = pg.matrix.SparseMapMatrix(mesh.boundaryCount(), mesh.cellCount())

    for b in mesh.boundaries():
        cellToFaceArithmetic(b, AMM)

    return AMM

def cellDataToCellGrad2(mesh, v):
    """TODO Documentme."""
    if len(v) != mesh.cellCount():
        print(len(v), mesh.cellCount())
        raise BaseException("len(v) dismatch mesh.cellCount()")

    vN = pg.meshtools.cellDataToNodeData(mesh, v)
    gC = np.zeros((mesh.cellCount(), 3))

    for c in mesh.cells():
        gr = c.grad(, vN)
        gC[, 0] = gr[0]
        gC[, 1] = gr[1]
        gC[, 2] = gr[2]
    return gC

def cellDataToCellGrad(mesh, v, CtB):
    """TODO Documentme."""
    if len(v) != mesh.cellCount():
        print(len(v), mesh.cellCount())
        raise BaseException("len of v missmatch mesh.cellCount()")
    div = mesh.boundaryDataToCellGradient(CtB * v)
    return np.vstack([pg.x(div), pg.y(div), pg.z(div)]).T

    # vF = cellDataToBoundaryData(mesh, v)
    # gC = np.zeros((mesh.cellCount(), 3))
    # for b in mesh.boundaries():
    #     leftCell = b.leftCell()
    #     rightCell = b.rightCell()
    #     vec = b.norm() * vF[] * b.size()
    #     if leftCell:
    #         gC[, 0] += vec[0]
    #         gC[, 1] += vec[1]
    #         gC[, 2] += vec[2]
    #     if rightCell:
    #         gC[, 0] -= vec[0]
    #         gC[, 1] -= vec[1]
    #         gC[, 2] -= vec[2]
    # gC[:, 0] /= mesh.cellSizes()
    # gC[:, 1] /= mesh.cellSizes()
    # gC[:, 2] /= mesh.cellSizes()
    # return gC

def findVelocity(mesh, v, b, c, nc=None):
    """Find velocity for boundary b based on vector field v.

    mesh : :gimliapi:`GIMLI::Mesh`

    v : array [(N,dim)]
        velocity field [[v_i,]_j,] with i=[1..3] for the mesh dimension
        and j = [0 .. N-1] per Cell or per Node so N is either
        mesh.cellCount() or mesh.nodeCount()

    b : :gimliapi:`GIMLI::Boundary`

    c : :gimliapi:`GIMLI::Cell`

        Associated Cell in flow direction

    nc : :gimliapi:`GIMLI::Cell`
        Associated neighbor cell .. if one given
        from flow direction
    vel = [0.0, 0.0, 0.0]

    if hasattr(v, '__len__'):
        if len(v) == mesh.cellCount():
            if nc:
                # mean cell based vector-field v[x,y,z] for cell c and cell nc
                vel = (v[] + v[]) / 2.0

                # vel1 = 1./ (1./v[] + 1./v[])
                # print("findVel, check:", vel1, vel)

                # cell based vector-field v[x,y,z] for cell c
                vel = v[]
        elif len(v) == mesh.boundaryCount():
            vel = v[]
            # interpolate node based vector-field v[x,y,z] at point
            # print(len(vel), vel)
            vel = c.vec(, v)

    return vel

def findDiffusion(mesh, a, b, c, nc=None):
    """TODO Documentme.

    a :
        Attribute for diffusion coefficient. Cell based or Boundary based

    b : :gimliapi:`GIMLI::Boundary`

    c : :gimliapi:`GIMLI::Cell`

        Associated cell in flow direction

    nc : :gimliapi:`GIMLI::Cell`
        associated neighbor cell .. if one given
        from flow direction
    D = 0.

    if nc:
        if len(a) == mesh.boundaryCount():
            D = a[] / * b.size()
            # Diffusion part
            # Interface harmonic median
            # D = (a[] / +
            # a[] / * 0.5 *
            # b.size()
            if a[] > 0 and a[] > 0:
                D = 1. / ( / a[] +
               / a[]) * b.size()
            # print(D)
        if len(a) == mesh.boundaryCount():
            D = a[] / * b.size()
            D = a[] / * b.size()
    return D

def diffusionConvectionKernel(mesh, a=None, b=0.0,
                              uB=None, duB=None,
                              # u0=0,
                              scheme='CDS', sparse=False, time=0.0,
    Generate system matrix for diffusion and convection in a velocity field.

    Particle concentration u inside a velocity field.

    Peclet Number - ratio between convection/diffusion = F/D
        F = velocity flow trough volume boundary,
        D = diffusion coefficient

    mesh: :gimliapi:`GIMLI::Mesh`
        Mesh represents spatial discretization of the calculation domain
    a: value | array | callable(cell, userData)
        Diffusion coefficient per cell
    b: value | array | callable(cell, userData)
        TODO What is b
    fn: iterable(cell)
        TODO What is fn
    vel: ndarray (N,dim) | RMatrix(N,dim)
        velocity field [[v_i,]_j,] with i=[1..3] for the mesh dimension
        and j = [0 .. N-1] per Cell or per Node so N is either
        mesh.cellCount() or mesh.nodeCount()
    scheme: str [CDS]
        Finite volume scheme
        * CDS -- Central Difference Scheme.
            maybe irregular for Peclet no. |F/D| > 2
            Diffusion dominant. Error of order 2
        * UDS -- Upwind Scheme.
            Convection dominant. Error of order 1
        * HS -- Hybrid Scheme.
            Diffusion dominant for Peclet-number |(F/D)| < 2
            Convection dominant else.
        * PS -- Power Law Scheme.
            Identical to HS for Peclet-number |(F/D)| > 10 and near to ES else
            Convection dominant.
        * ES -- Exponential scheme
            Only stationary one-dimensional but exact solution
    S: :gimliapi:`GIMLI::SparseMatrix` | numpy.ndarray(nCells, nCells)
        Kernel matrix, depends on vel, a, b, scheme, uB, duB .. if some of this
        has been changed you cannot cache these matrix
    rhsBoundaryScales: ndarray(nCells)
        RHS offset vector
    if a is None:
        a = pg.Vector(mesh.boundaryCount(), 1.0)

    AScheme = None
    if scheme == 'CDS':
        AScheme = lambda peclet_: 1.0 - 0.5 * abs(peclet_)
    elif scheme == 'UDS':
        AScheme = lambda peclet_: 1.0
    elif scheme == 'HS':
        AScheme = lambda peclet_: max(0.0, 1.0 - 0.5 * abs(peclet_))
    elif scheme == 'PS':
        AScheme = lambda peclet_: max(0.0, (1.0 - 0.1 * abs(peclet_))**5.0)
    elif scheme == 'ES':
        AScheme = lambda peclet_: (peclet_) / (np.exp(abs(peclet_)) - 1.0) \
            if peclet_ != 0.0 else 1
        raise BaseException("Scheme unknwon:" + scheme)

    useHalfBoundaries = False

    dof = mesh.cellCount()

    if not uB:
        uB = []
    if not duB:
        duB = []

    if useHalfBoundaries:
        dof = mesh.cellCount() + len(uB)

    S = None
    if sparse:
        S = pg.matrix.SparseMapMatrix(dof, dof, stype=0) + identity(dof, scale=b)
        S = np.zeros((dof, dof))

    rhsBoundaryScales = np.zeros(dof)

    # we need this to fast identify uBoundary and value by boundary
    uBoundaryID = []
    uBoundaryVals = [None] * mesh.boundaryCount()

    for [b, val] in uB:

        if isinstance(b, pg.core.Boundary):
            uBoundaryVals[] = val
        elif isinstance(b, pg.core.Node):
            for _b in b.boundSet():
                if _b.rightCell() is None:
                    pg.warn('Dirichlet for one node considered for the nearest boundary.',
                    uBoundaryVals[] = val
            raise BaseException("Please give boundary, value list")

    duBoundaryID = []
    duBoundaryVals = [None] * mesh.boundaryCount()

    for [boundary, val] in duB:
        if not isinstance(boundary, pg.core.Boundary):
            raise BaseException("Please give boundary, value list")

        duBoundaryVals[] = val

    # iterate over all cells
    for cell in mesh.cells():
        cID =
        for bi in range(cell.boundaryCount()):
            boundary = pg.core.findBoundary(cell.boundaryNodes(bi))

            ncell = boundary.leftCell()
            if ncell == cell:
                ncell = boundary.rightCell()

            v = findVelocity(mesh, vel, boundary, cell, ncell)

            # Convection part
            F = boundary.norm(cell).dot(v) * boundary.size()
            # print(F, boundary.size(), v, vel)
            # Diffusion part
            D = findDiffusion(mesh, a, boundary, cell, ncell)

            # print(F, D, F/D)
            # print((1.0 - 0.1 * abs(F/D))**5.0)
            if D > 0:
                aB = D * AScheme(F / D) + max(-F, 0.0)
                aB = max(-F, 0.0)

            aB /= cell.size()

            # print(,, boundary.norm(cell), aB)
            if ncell:
                # no boundary
                if sparse:
                    S.addVal(cID,, -aB)
                    S.addVal(cID, cID, +aB)
                    S[cID,] -= aB
                    S[cID, cID] += aB

            elif not useHalfBoundaries:

                if in uBoundaryID:
                    val = pg.solver.generateBoundaryValue(boundary,

                    if sparse:
                        S.addVal(cID, cID, aB)
                        S[cID, cID] += aB

                    #print(aB, uBoundaryVals[])
                    rhsBoundaryScales[cID] += aB * np.mean(val)

                if in duBoundaryID:
                    # Neumann boundary condition
                    val = pg.solver.generateBoundaryValue(

                    val = np.mean(val)

                    # amount of flow through the boundary .. maybe buggy
                    # fill be replaced by suitable FE solver
                    outflow = -val * boundary.size() / cell.size()

                    if sparse:
                        S.addVal(cID, cID, outflow)
                        S[cID, cID] += outflow

        if fn is not None:
            if sparse:
                # * cell.shape().domainSize())
                S.addVal(,, -fn[])
                # * cell.shape().domainSize()
                S[,] -= fn[]

    return S, rhsBoundaryScales

[docs] def solveFiniteVolume(mesh, a=1.0, b=0.0, f=0.0, fn=0.0, vel=None, u0=0.0, times=None, uL=None, relax=1.0, ws=None, scheme='CDS', **kwargs): r"""Solve partial differential equation with Finite Volumes. This function is a syntactic sugar proxy for using the Finite Volume functionality of the library core to solve elliptic and parabolic partial differential of the following type: .. math:: \frac{\partial u}{\partial t} + \mathbf{v}\cdot\nabla u & = \nabla\cdot(a \nabla u) + b u + f(\mathbf{r},t)\\ u(\mathbf{r}, t) & = u_B \quad\mathbf{r}\in\Gamma_{\text{Dirichlet}}\\ \frac{\partial u(\mathbf{r}, t)}{\partial \mathbf{n}} & = u_{\partial \text{B}} \quad\mathbf{r}\in\Gamma_{\text{Neumann}}\\ u(\mathbf{r}, t=0) & = u_0 \quad\text{with} \quad\mathbf{r}\in\Omega The Domain :math:`\Omega` and the Boundary :math:`\Gamma` are defined through the given mesh with appropriate boundary marker. The solution :math:`u(\mathbf{r}, t)` is given for each cell in the mesh. TODO: * Refactor with solver class and Runga-Kutte solver * non steady boundary conditions Parameters ---------- mesh: :gimliapi:`GIMLI::Mesh` Mesh represents spatial discretization of the calculation domain a: value | array | callable(cell, userData) Stiffness weighting per cell values. b: value | array | callable(cell, userData) Scale for mass values b f: iterable(cell) Load vector fn: iterable(cell) TODO What is fn vel: ndarray (N,dim) | RMatrix(N,dim) Velocity field :math:`\mathbf{v}(\mathbf{r}, t=\text{const}) = \{[v_i]_j,\}` with :math:`i=[1\ldots 3]` for the mesh dimension and :math:`j = [0\ldots N-1]` with N either the amount of cells, nodes, or boundaries. Velocities per boundary are preferred and will be interpolated on demand. u0: value | array | callable(cell, userData) Starting field times: iterable Time steps to calculate for. ws Workspace This can be an empty class that will used as an Workspace to store and cache data. If ws is given: The system matrix is taken from ws or calculated once and stored in ws for further usage. The system matrix is cached in this Workspace as ws.S The LinearSolver with the factorized matrix is cached in this Workspace as ws.solver The rhs vector is only stored in this Workspace as ws.rhs scheme: str [CDS] Finite volume scheme: :py:mod:`pygimli.solver.diffusionConvectionKernel` **kwargs: * bc : Boundary Conditions dictionary, see pg.solver * uB : Dirichlet boundary conditions DEPRECATED * duB : Neumann boundary conditions DEPRECATED Returns ------- u: ndarray(nTimes, nCells) Solution field for all time steps. """ verbose = kwargs.pop('verbose', False) # The Workspace is to hold temporary data or preserve matrix rebuild # swatch = pg.Stopwatch(True) sparse = True workspace = pg.solver.WorkSpace() if ws: workspace = ws a = pg.solver.parseArgToArray(a, [mesh.cellCount(), mesh.boundaryCount()]) b = pg.solver.parseArgToArray(b, mesh.cellCount()) f = pg.solver.parseArgToArray(f, mesh.cellCount()) fn = pg.solver.parseArgToArray(fn, mesh.cellCount()) boundsDirichlet = None boundsNeumann = None # BEGIN check for Courant-Friedrichs-Lewy if vel is not None: if isinstance(vel, float): print("Warning! .. velocity is float and no vector field") # we need velocities for boundaries if len(vel) is not mesh.boundaryCount(): if len(vel) == mesh.cellCount(): vel = pg.meshtools.cellDataToNodeData(mesh, vel) elif len(vel) == mesh.nodeCount(): vel = pg.meshtools.nodeDataToBoundaryData(mesh, vel) else: print("mesh:", mesh) print("vel:", vel.shape) raise Exception("Cannot determine data format for velocities") if times is not None: pg.solver.checkCFL(times, mesh, np.max(pg.abs(vel))) if not hasattr(workspace, 'S'): boundsDirichlet = [] if 'bc' in kwargs: bct = dict(kwargs['bc']) if 'Dirichlet' in bct: boundsDirichlet += pg.solver.parseArgToBoundaries(bct.pop('Dirichlet'), mesh) if 'Node' in bct: n = bct.pop('Node') boundsDirichlet.append([mesh.node(n[0]), n[1]]) if 'Neumann' in bct: boundsNeumann = pg.solver.parseArgToBoundaries(bct.pop('Neumann'), mesh) if 'uB' in kwargs: pg.deprecated('use new bc dictionary') boundsDirichlet = pg.solver.parseArgToBoundaries(kwargs['uB'], mesh) if 'duB' in kwargs: pg.deprecated('use new bc dictionary') boundsNeumann = pg.solver.parseArgToBoundaries(kwargs['duB'], mesh) workspace.S, workspace.rhsBCScales = diffusionConvectionKernel( mesh=mesh, a=a, b=b, uB=boundsDirichlet, duB=boundsNeumann, # u0=u0, fn=fn, vel=vel, scheme=scheme, sparse=sparse, userData=kwargs.pop('userData', None)) dof = len(workspace.rhsBCScales) workspace.ap = np.zeros(dof) # for nonlinears if uL is not None: for i in range(dof): val = 0.0 if sparse: val = workspace.S.getVal(i, i) / relax workspace.S.setVal(i, i, val) else: val = workspace.S[i, i] / relax workspace.S[i, i] = val workspace.ap[i] = val # print('FVM kernel 2:', swatch.duration(True)) # endif: not hasattr(workspace, 'S'): workspace.rhs = np.zeros(len(workspace.rhsBCScales)) workspace.rhs[0:mesh.cellCount()] = f # * mesh.cellSizes() workspace.rhs += workspace.rhsBCScales # for nonlinear: relax progress with scaled last result if uL is not None: workspace.rhs += (1. - relax) * workspace.ap * uL # print('FVM: Prep:', swatch.duration(True)) if not hasattr(times, '__len__'): if sparse and not hasattr(workspace, 'solver'): Sm = pg.matrix.SparseMatrix(workspace.S) # hold Sm until we have reference counting, # loosing Sm here will kill LinSolver later workspace.Sm = Sm workspace.solver = pg.core.LinSolver(Sm, verbose=verbose) u = None if sparse: u = workspace.solver.solve(workspace.rhs) else: u = np.linalg.solve(workspace.S, workspace.rhs) # print('FVM solve:', swatch.duration(True)) return u[0:mesh.cellCount():1] else: theta = kwargs.pop('theta', 0.5 + 1e-6) if sparse: I = pg.solver.identity(len(workspace.rhs)) else: I = np.diag(np.ones(len(workspace.rhs))) progress = None if verbose: from pygimli.utils import ProgressBar progress = ProgressBar(its=len(times)) print("Solve timesteps with Crank-Nicolson.") return pg.solver.crankNicolson(times, workspace.S, I, f=workspace.rhs, u0=pg.solver.cellValues(mesh, u0), theta=theta, progress=progress)
def createFVPostProzessMesh(mesh, u, uDirichlet): """Create node based post processing of cell centered FV solutions. Create a mesh suitable for node based post processing of cell centered Finite Volume solutions. This is something like cellDataToNodeData with extra Dirichlet points but without smoothing due to interpolation. IMPROVE DOC!! Parameters ---------- """ allBounds = pg.solver.parseArgToBoundaries(uDirichlet, mesh) bounds, vals = zip(*allBounds) uDirVals = pg.solver.generateBoundaryValue(bounds, vals) def isBoundary(b): """TODO Documentme.""" return b.rightCell() is None and b.leftCell() is not None if bounds is None: bounds = [] for b in mesh.boundaries(): if isBoundary(b): bounds.append(b) boundsIdx = [] for b in bounds: boundsIdx.append( poly2 = pg.Mesh(2) for p in mesh.cellCenters(): poly2.createNode(p) boundSort = [] boundSort.append(bounds[0]) boundSort[-1].tag() boundSortIdx = [0] lastB = boundSort[-1] lastN = lastB.node(1) while len(boundSort) != len(bounds): newB = None newN = None for b in lastN.boundSet(): if not b.tagged() and isBoundary(b): boundSort.append(b) b.tag() newB = b if newB.node(0) == lastN: newN = newB.node(1) else: newN = newB.node(0) lastN = newN lastB = newB if not newB: raise BaseException("implement me") boundSortIdx.append(boundsIdx.index( bNodes = [poly2.createNode( for b in boundSort] for i, _ in enumerate(bNodes): poly2.createEdge(bNodes[i], bNodes[(i + 1) % len(bNodes)]) mesh2 = createMesh(poly2, switches='-pezY') return mesh2,, uDirVals[boundSortIdx]) # def createFVPostProzessMesh(...) def applyBoundaryValues(uB, mesh, uBBC): """TODO Documentme.""" if isinstance(uBBC, dict): boundsDirichlet = pg.solver.parseArgToBoundaries(uBBC['Dirichlet'], mesh) for b, v in boundsDirichlet: uB[] = v else: for marker, val in uBBC: for b in mesh.findBoundaryByMarker(marker): uB[] = val def __d(name, v, showAll=False): """TODO Documentme.""" print(name, np.mean(v), min(v), max(v)) if showAll: print(v) def __solveStokes(mesh, viscosity, velBoundary=None, preBoundary=None, pre0=None, vel0=None, tol=1e-4, maxIter=1000, verbose=1, **kwargs): """Steady Navier-Stokes solver.""" workspace = pg.solver.WorkSpace() wsux = pg.solver.WorkSpace() wsuy = pg.solver.WorkSpace() wsp = pg.solver.WorkSpace() # get cache values if given ws = kwargs.pop('ws', None) if ws: workspace = ws if hasattr(workspace, 'wsux'): wsux = workspace.wsux else: workspace.wsux = wsux if hasattr(workspace, 'wsuy'): wsuy = workspace.wsuy else: workspace.wsuy = wsuy if hasattr(workspace, 'wsp'): wsp = workspace.wsp else: workspace.wsp = wsp velocityRelaxation = kwargs.pop('vRelax', 0.5) pressureRelaxation = kwargs.pop('pRelax', 0.8) pressureCoeff = None preCNorm = [] divVNorm = [] # velBoundaryX = [] # velBoundaryY = [] # if velBoundary is not None: # for marker, vel in velBoundary: # if vel is not None: # if not isinstance(vel[0], str): # velBoundaryX.append([marker, vel[0]]) # if not isinstance(vel[1], str): # velBoundaryY.append([marker, vel[1]]) pressure = None if pre0 is None: pressure = np.zeros(mesh.cellCount()) else: if len(pre0) == mesh.nodeCount(): pressure = pg.meshtools.nodeDataToCellData(mesh, pre0) else: pressure = np.array(pre0) velocity = None if vel0 is None: velocity = np.zeros((mesh.cellCount(), mesh.dimension())) else: velocity = np.array(vel0) mesh.createNeighborInfos() CtB = mesh.cellToBoundaryInterpolation() controlVolumes = CtB * mesh.cellSizes() density = kwargs.pop('density', 1.0) force = kwargs.pop('f', [0.0, 0.0]) density = pg.solver.parseArgToArray(density, nDof=mesh.cellCount()) forceX = pg.solver.parseArgToArray(force[0], nDof=mesh.cellCount()) forceY = pg.solver.parseArgToArray(force[1], nDof=mesh.cellCount()) for i in range(maxIter): pressureGrad = cellDataToCellGrad(mesh, pressure, CtB) # __d('vx', pressureGrad[:,0]) velocity[:, 0] = solveFiniteVolume( mesh, a=viscosity, f=-pressureGrad[:, 0] / density + forceX, bc=velBoundary[0], uL=velocity[:, 0], relax=velocityRelaxation, ws=wsux) # for s in wsux.S: # print(s) # __d('rhs', wsux.rhs, 1) # __d('ux', velocity[:,0]) velocity[:, 1] = solveFiniteVolume( mesh, a=viscosity, f=-pressureGrad[:, 1] / density + forceY, bc=velBoundary[1], uL=velocity[:, 1], relax=velocityRelaxation, ws=wsuy) ap = np.array(wsux.ap * mesh.cellSizes()) apF = CtB * ap uxF = CtB * velocity[:, 0] uyF = CtB * velocity[:, 1] applyBoundaryValues(uxF, mesh, velBoundary[0]) applyBoundaryValues(uyF, mesh, velBoundary[1]) pxF = CtB * pressureGrad[:, 0] pyF = CtB * pressureGrad[:, 1] pF2 = cellDataToBoundaryGrad(mesh, pressure, pressureGrad) velXF = uxF + controlVolumes / apF * (pxF - pF2[:, 0]) velYF = uyF + controlVolumes / apF * (pyF - pF2[:, 1]) applyBoundaryValues(velXF, mesh, velBoundary[0]) applyBoundaryValues(velYF, mesh, velBoundary[1]) if pressureCoeff is None: pressureCoeff = 1. / apF * mesh.boundarySizes() * \ _boundaryToCellDistances(mesh) div = -mesh.divergence(np.vstack([velXF, velYF]).T) # boundsDirichlet = pg.solver.parseArgToBoundaries(preBoundary, mesh) # pB = CtB * pressure # for ix, [boundary, val] in enumerate(boundsDirichlet): # boundsDirichlet[ix][1] = -(-pg.solver.generateBoundaryValue(boundary, # val) + pB[]) pressureCorrection = solveFiniteVolume(mesh, a=pressureCoeff, f=div, bc=preBoundary, # uB=boundsDirichlet, ws=wsp) #, pressureCorrection, label='pC') # print(i, pg.solver.generateBoundaryValue(mesh.boundary(58), val), # pB[58], boundsDirichlet[-1][1], # (CtB*pressureCorrection)[58]) pressure += pressureCorrection * pressureRelaxation pressureCorrectionGrad = cellDataToCellGrad(mesh, pressureCorrection, CtB) velocity[:, 0] -= pressureCorrectionGrad[:, 0] / ap * mesh.cellSizes() velocity[:, 1] -= pressureCorrectionGrad[:, 1] / ap * mesh.cellSizes() preCNorm.append(pg.core.norm(pressureCorrection)) divVNorm.append(pg.core.norm(div)) if workspace: workspace.div = div # __d('div', div) # if ( i == 1): # sd convergenceTest = 100 if i > 6: convergenceTest = (divVNorm[-1] - divVNorm[-2]) + \ (divVNorm[-2] - divVNorm[-3]) + \ (divVNorm[-3] - divVNorm[-4]) + \ (divVNorm[-4] - divVNorm[-5]) + \ (divVNorm[-5] - divVNorm[-6]) convergenceTest /= 5 if verbose: print("\r" + str(i) + " div V=" + str(divVNorm[-1]) + " ddiv V=" + str(convergenceTest)) if divVNorm[-1] > 1e6: print("\r" + str(i) + " div V=" + str(divVNorm[-1]) + " ddiv V=" + str(convergenceTest)) raise BaseException("Stokes solver seems diverging") if i > 2: if i == maxIter or divVNorm[-1] < tol or \ abs(convergenceTest * divVNorm[-1]) < tol: break if verbose: print(str(i) + ": |pC|" + str(preCNorm[-1])) return velocity, pressure, preCNorm, divVNorm def _test_ConvectionAdvection(): """Test against a reference solution. taken from """ N = 21 # 21 reference maxIter = 11 # 11 reference Nx = N Ny = N x = np.linspace(-1.0, 1.0, Nx + 1) y = np.linspace(-1.0, 1.0, Ny + 1) grid = pg.createGrid(x=x, y=y) a = pg.Vector(grid.cellCount(), 1.0) b7 = grid.findBoundaryByMarker(1)[0] for b in grid.findBoundaryByMarker(1): if[1] <[1]: b7 = b b7.setMarker(7) swatch = pg.Stopwatch(True) # velBoundary = [[1, [0.0, 0.0]], # [2, [0.0, 0.0]], # [3, [1.0, 0.0]], # [4, [0.0, 0.0]], # [7, [0.0, 0.0]]] velBoundaryX = {'Dirichlet':{'1,2,4,7': 0.0, 3: 1.0,}} velBoundaryY = {'Dirichlet':{'1,2,4,7': 0.0, 3: 0.0,}} # velBoundaryX = {'Dirichlet':{'1,2,7': 0.0, # 3: 1.0,}} # velBoundaryY = {'Dirichlet':{'3,4,7': 0.0}} preBoundary = {'Dirichlet': {7: 0.0}} vel, pres, pCNorm, divVNorm = __solveStokes(grid, a, [velBoundaryX, velBoundaryY], preBoundary, maxIter=maxIter, verbose=1) print("time", len(pCNorm), swatch.duration(True)) referenceSolutionDivV = 0.029187181920161752 #fipy print("divNorm: ", divVNorm[-1]) print("to reference: ", divVNorm[-1] - referenceSolutionDivV) np.testing.assert_approx_equal(divVNorm[-1], referenceSolutionDivV, significant=8) fig = pg.plt.figure() ax1 = fig.add_subplot(1, 3, 1) ax2 = fig.add_subplot(1, 3, 2) ax3 = fig.add_subplot(1, 3, 3) show(grid, data=pg.meshtools.cellDataToNodeData(grid, pres), label='Pressure', cMap='RdBu', ax=ax1) show(grid, data=pg.core.logTransDropTol( pg.meshtools.cellDataToNodeData(grid, vel[:, 0]), 1e-2), label='$v_x$', ax=ax2) show(grid, data=pg.core.logTransDropTol( pg.meshtools.cellDataToNodeData(grid, vel[:, 1]), 1e-2), label='$v_y$', ax=ax3) show(grid, data=vel, ax=ax1, showMesh=True) pg.plt.figure() pg.plt.semilogy(pCNorm, label='norm') pg.plt.legend() pg.wait() if __name__ == '__main__': _test_ConvectionAdvection()