#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Curve-fitting with harmonic functions plus offset and drift."""
import numpy as np
import pygimli as pg
from .modelling import Modelling
from .inversion import MarquardtInversion
[docs]
class HarmFunctor(object):
"""Functor for harmonic functions plus offset and drift."""
[docs]
def __init__(self, A, coeff, xmin, xSpan):
"""Initialize."""
self.A_ = A # what the fuck? init an A matrix here...
self.coeff_ = coeff
self.xmin_ = xmin
self.xSpan_ = xSpan
def __call__(self, x):
"""Yield function call."""
nc = len(self.coeff_) / 2
A = np.ones(nc * 2) * 0.5 # ... and generating one on the fly here?
A[1] = x
A[2::2] = np.sin(2.0 * np.pi * np.arange(1, nc) *
(x - self.xmin_) / self.xSpan_)
A[3::2] = np.cos(2.0 * np.pi * np.arange(1, nc) *
(x - self.xmin_) / self.xSpan_)
return sum(A * self.coeff_)
class HarmonicModelling(Modelling):
"""Harmonic modelling."""
def __init__(self, x=None, nc=10):
"""Init."""
super().__init__()
self.xmin = min(x)
self.xspread = max(x) - min(x)
self.xs = (x - self.xmin) / self.xspread
self.nc = nc
self.A = np.ones((len(x), nc * 2 + 2))
self.A[:, 1] = self.xs
for i in range(nc):
self.A[:, i*2+2] = np.sin(2.0 * np.pi * (i+1) * self.xs)
self.A[:, i*2+3] = np.cos(2.0 * np.pi * (i+1) * self.xs)
def response(self, par):
"""Forward response."""
return self.A.dot(par)
def resample(self, par, xr):
"""Resampled data."""
xrs = (xr - self.xmin) / self.xspread
out = par[0] + par[1] * xrs
for i in range(self.nc):
out += np.sin(2.0 * np.pi * (i+1) * xrs)
out += np.cos(2.0 * np.pi * (i+1) * xrs)
return out
def createStartModel(self, dataVals):
"""Starting model."""
mv = np.mean(dataVals)
model = pg.Vector(self.nc*2+2, mv/10)
model[0] = mv
return model
[docs]
def harmfitNative(y, x=None, nc=None, xc=None, error=None, err=None):
"""Python-based curve-fit by harmonic functions.
Parameters
----------
y : iterable
values of a curve to be fitted
x : iterable
abscissa, if none [0..len(y))
nc : int
number of coefficients
err : iterable
absolute data error
xc : iterable
abscissa to predict y on (otherwise equal to x)
"""
y = np.asarray(y)
if x is None:
x = list(range(len(y)))
x = np.asarray(x)
if xc is not None:
xc = np.asarray(xc)
else:
xc = x
if error is None:
if err is not None: # old keyword
error = err
else:
error = np.ones((1, len(x)))
if nc is None or nc == 0:
# nc=round(length(x)/30) % number of coefficients
nc = len(x) // 30
xspan = max(x) - min(x)
xmi = min(x)
# A=ones(length(x),nc*2+2)/2 %nc*(sin+cos)+offset+drift
A = np.ones((len(x), nc * 2 + 2)) / 2.0 # %nc*(sin+cos)+offset+drift
# B=ones(length(xc),nc*2+2)/2
B = np.ones((len(xc), nc * 2 + 2)) / 2.0
A[:, 1] = x[:] * 3.0
B[:, 1] = xc[:] * 3.0
for i in range(1, nc + 1):
A[:, i * 2 + 0] = np.sin(2.0 * i * np.pi * (x - xmi) / xspan)
A[:, i * 2 + 1] = np.cos(2.0 * i * np.pi * (x - xmi) / xspan)
B[:, i * 2 + 0] = np.sin(2.0 * i * np.pi * (xc - xmi) / xspan)
B[:, i * 2 + 1] = np.cos(2.0 * i * np.pi * (xc - xmi) / xspan)
# w = 1.0 / err w(~isfinite(w))=0
w = 1.0 / error
# w[(~isfinite(w)]=0
# coeff=(spdiags(w,0,length(w),length(w))*A)\(y.*w)
# WA = np.diag(w, 0).dot(A)
# coeff, res, rank, s = np.linalg.lstsq(WA, y * w, rcond=None)
coeff, res, rank, s = np.linalg.lstsq(np.diag(w, 0) * A, (y * w)[0, :],
rcond=None)
return sum((B * coeff).T), HarmFunctor(A, coeff, xmi, xspan)
def harmfitCore(y, x=None, error=None, nc=42, resample=None, lam=0.1,
window=None, verbose=False, dosave=False,
lineSearch=True, robust=False, maxiter=20):
"""GIMLi-based curve-fit by harmonic functions.
Parameters
----------
y : 1d-array - values to be fitted
x : 1d-array(len(y)) - data abscissa data. default: [0 .. len(y))
error : 1d-array(len(y)) error of y. default (absolute error = 0.01)
nc : int - Number of harmonic coefficients
resample : 1d-array - resample y to x using fitting coeffients
window : int - just fit data inside window bounds
Returns
-------
response : 1d-array(len(resample) or len(x)) - smoothed values
coefficients : 1d-array - fitting coefficients
"""
if x is None:
x = np.arange(len(y))
xToFit = None
yToFit = None
if window is not None:
idx = pg.find((x >= window[0]) & (x < window[1]))
# idx = getIndex(x , lambda v: v > window[0] and v < window[1])
xToFit = x(idx)
yToFit = y(idx)
if error is not None:
error = error(idx)
else:
xToFit = x
yToFit = y
fop = pg.core.HarmonicModelling(nc, xToFit, verbose)
inv = pg.core.RInversion(yToFit, fop, verbose, dosave)
if error is not None:
inv.setAbsoluteError(error)
else:
inv.setRelativeError(0.01)
if error is not None:
inv.stopAtChi1(True)
inv.setMarquardtScheme(0.9)
inv.setLambda(lam)
inv.setMaxIter(maxiter)
inv.setLineSearch(lineSearch)
inv.setRobustData(robust)
coeff = inv.run()
if resample is not None:
ret = fop.response(coeff, resample)
if window is not None:
ret.setVal(0.0, pg.find((resample < window[0]) |
(resample >= window[1])))
return ret, coeff
else:
return inv.response(), coeff
[docs]
def harmfit(y, x=None, error=None, nc=42, resample=None, # lam=0.1,
window=None, verbose=False, **kwargs):
"""GIMLi-based curve-fit by harmonic functions.
Parameters
----------
y : 1d-array - values to be fitted
x : 1d-array(len(y)) - data abscissa data. default: [0 .. len(y))
error : 1d-array(len(y)) error of y. default (absolute error = 0.01)
nc : int - Number of harmonic coefficients
resample : 1d-array - resample y to x using fitting coeffients
window : int - just fit data inside window bounds
Returns
-------
response : 1d-array(len(resample) or len(x))
smoothed values
inv : pg.Inversion
coefficients : 1d-array - fitting coefficients
"""
if x is None:
x = np.arange(len(y))
xToFit = x
yToFit = y
if window is not None:
idx = pg.find((x >= window[0]) & (x < window[1]))
xToFit = x(idx)
yToFit = y(idx)
if error is not None:
error = error(idx)
fop = HarmonicModelling(xToFit, nc=nc)
inv = MarquardtInversion(fop=fop, verbose=verbose)
inv.modelTrans = pg.trans.Trans()
if error is None:
errorRel = np.ones_like(x) * 0.01
else:
errorRel = np.maximum(np.abs(error / yToFit), 0.001)
kwargs.setdefault("lam", 0.001)
coeff = inv.run(yToFit, errorRel, **kwargs, verbose=verbose)
if resample is not None:
ret = fop.resample(coeff, resample)
if window is not None:
ret.setVal(0.0, pg.find((resample < window[0]) |
(resample >= window[1])))
return ret, inv
else:
return inv.response, inv