Source code for obstools.comply.classes

# Copyright 2019 Pascal Audet & Helen Janiszewski
#
# This file is part of OBStools.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import numpy as np
import pickle
from obstools.atacr import utils, DayNoise, StaNoise
np.seterr(all='ignore')


[docs] class Comply(object): """ A Comply object contains attributes that calculate and store the compliance and coherence functions for the available channels. Note ---- The object is initialized with either a processed :class:`~obstools.atacr.classes.DayNoise` or :class:`~obstools.atacr.classes.StaNoise` object. Each individual spectral quantity is unpacked as an object attribute, but all of them are discarded as the object is saved to disk. Attributes ---------- elev : float Station elevation in meters (OBS stations have negative elevations) f : :class:`~numpy.ndarray` Frequency axis for corresponding time sampling parameters c11 : `numpy.ndarray` Power spectra for component `H1`. Other identical attributes are available for the power, cross and rotated spectra: [11, 12, 1Z, 1P, 22, 2Z, 2P, ZZ, ZP, PP, HH, HZ, HP] tf_list : Dict Dictionary of possible transfer functions from the available components (obtained from the :class:`~obstools.atacr.classes.DayNoise` or the :class:`~obstools.atacr.classes.StaNoise` noise objects) """ def __init__(self, objnoise=None, elev=None): if any(value is None for value in [elev, objnoise]): raise(Exception( "Error: Initializing EventStream object with None values - " + "aborting")) if (objnoise and not isinstance(objnoise, DayNoise) and not isinstance(objnoise, StaNoise)): msg = "Error: A TFNoise object must be initialized with only " +\ "one of type DayNoise or StaNoise object" raise TypeError(msg) if not objnoise.av: raise(Exception( "Error: Noise object has not been processed (QC and " + "averaging) - aborting")) self.elevation = elev self.f = objnoise.f self.c11 = objnoise.power.c11 self.c22 = objnoise.power.c22 self.cZZ = objnoise.power.cZZ self.cPP = objnoise.power.cPP self.cHH = objnoise.rotation.cHH self.cHZ = objnoise.rotation.cHZ self.cHP = objnoise.rotation.cHP self.c12 = objnoise.cross.c12 self.c1Z = objnoise.cross.c1Z self.c1P = objnoise.cross.c1P self.c2Z = objnoise.cross.c2Z self.c2P = objnoise.cross.c2P self.cZP = objnoise.cross.cZP self.tf_list = objnoise.tf_list
[docs] class ComplyDict(dict): def __init__(self): self = dict() def add(self, key, value): self[key] = value
[docs] def calculate_compliance(self): """ Method to calculate compliance and coherence functions from the averaged (daily or station-averaged) noise spectra. Attributes ---------- complyfunc : Dict Container Dictionary for all possible compliance and coherence functions Examples -------- Calculate compliance and coherence functions for a DayNoise object. In these examples, station elevation is extracted from the IRIS metadata aggregator site: http://ds.iris.edu/mda/7D/M08A/ >>> from obstools.atacr import DayNoise >>> from obstools.comply import Comply >>> daynoise = DayNoise('demo') Uploading demo data - March 04, 2012, station 7D.M08A >>> daynoise.QC_daily_spectra() >>> daynoise.average_daily_spectra() >>> daycomply = Comply(objnoise=daynoise, elev=-126.4) >>> daycomply.calculate_compliance() >>> tfnoise.complyfunc.keys() dict_keys(['ZP', 'ZP-21', 'ZP-H']) Calculate compliance and coherence functions for a StaNoise object >>> from obstools.atacr import StaNoise >>> from obstools.comply import Comply >>> stanoise = StaNoise('demo') Uploading demo data - March 01 to 04, 2012, station 7D.M08A >>> stanoise.QC_sta_spectra() >>> stanoise.average_sta_spectra() >>> stacomply = Comply(objnoise=stanoise, elev=-126.4) >>> stacomply.calculate_compliance() >>> stacomply.complyfunc.keys() dict_keys(['ZP', 'ZP-21']) """ def wavenumber(omega, H): """ Function to approximate wavenumber from dispersion relation H is depth below the seafloor, in meters omega is a vector of positive angular frequencies Stephen G. Mosher, 2020 """ import numpy.polynomial as poly g = 9.79329 N = len(omega) # Approximations for k when k*H is very large (deep case) or # very small (shallow case) k_deep = omega**2 / g k_shal = omega / np.sqrt(g * H) """ Alternatively, we can use a rational approximation to tanh(x) to solve k for any omega. This approximation gives a quartic equation, we take the positive real roots as the value of k we're interested in. The rational approximation being used is always better than the shallow approximation. However, it's only better than the deep approximation if k*H < 2.96. Therefore, we keep the solutions to k we find, using the rational approximation for k*H < 2.96 and use the deep water approximation to solve for k otherwise. The average error is just under 1% and the maximum error is 2.5%. """ k = np.zeros(len(omega)) for i, om in enumerate(omega): if i == 0: k[i] = 0. else: a0 = -27 * om**2 / g # constant terms a1 = 0. # no linear terms a2 = 27 * H - (9 * om**2 * H**2)/g # quadratic terms a3 = 0. # no cubic terms a4 = H**3 # quartic terms p = poly.Polynomial([a0, a1, a2, a3, a4]) solu = poly.Polynomial.roots(p) positive_roots = solu[solu > 0] real_positive_root = \ positive_roots[positive_roots.imag == 0].real[0] k[i] = real_positive_root # For k*H >= 2.96, prefer the deep approximation above for i, wavenumber in enumerate(k_deep): if wavenumber * H > 2.96: k[i] = k_deep[i] return k # Calculate wavenumber - careful here, elevation is negative k = wavenumber(2.*np.pi*self.f, -1.*self.elevation) # Initialize empty dictionary complyfunc = self.ComplyDict() # Cycle through all available transfer functions in the objnoise # object for key, value in self.tf_list.items(): if key == 'ZP': if value: admit_ZP = utils.admittance(self.cZP, self.cPP) compl_ZP = k*admit_ZP coh_ZP = utils.coherence(self.cZP, self.cPP, self.cZZ) complyfunc.add('ZP', [compl_ZP, coh_ZP]) elif key == 'ZP-21': if value: lc1cZ = np.conj(self.c1Z)/self.c11 lc1c2 = np.conj(self.c12)/self.c11 lc1cP = np.conj(self.c1P)/self.c11 coh_12 = utils.coherence(self.c12, self.c11, self.c22) coh_1P = utils.coherence(self.c1P, self.c11, self.cPP) coh_1Z = utils.coherence(self.c1Z, self.c11, self.cZZ) gc2c2_c1 = self.c22*(1. - coh_12) gcPcP_c1 = self.cPP*(1. - coh_1P) gcZcZ_c1 = self.cZZ*(1. - coh_1Z) gc2cZ_c1 = np.conj(self.c2Z) - np.conj(lc1c2*self.c1Z) gcPcZ_c1 = self.cZP - np.conj(lc1cP*self.c1Z) gc2cP_c1 = np.conj(self.c2P) - np.conj(lc1c2*self.c1P) lc2cP_c1 = gc2cP_c1/gc2c2_c1 lc2cZ_c1 = gc2cZ_c1/gc2c2_c1 coh_c2cP_c1 = utils.coherence(gc2cP_c1, gc2c2_c1, gcPcP_c1) coh_c2cZ_c1 = utils.coherence(gc2cZ_c1, gc2c2_c1, gcZcZ_c1) gcPcP_c1c2 = gcPcP_c1*(1. - coh_c2cP_c1) gcPcZ_c1c2 = gcPcZ_c1 - np.conj(lc2cP_c1)*gc2cZ_c1 gcZcZ_c1c2 = gcZcZ_c1*(1. - coh_c2cZ_c1) admit_ZP_21 = utils.admittance( gcPcZ_c1c2, gcPcP_c1c2) compl_ZP_21 = k*admit_ZP_21 coh_ZP_21 = utils.coherence( gcPcZ_c1c2, gcPcP_c1c2, gcZcZ_c1c2) complyfunc.add('ZP-21', [compl_ZP_21, coh_ZP_21]) elif key == 'ZP-H': if value: lcHcP = np.conj(self.cHP)/self.cHH coh_HP = utils.coherence(self.cHP, self.cHH, self.cPP) coh_HZ = utils.coherence(self.cHZ, self.cHH, self.cZZ) gcPcP_cH = self.cPP*(1. - coh_HP) gcZcZ_cH = self.cZZ*(1. - coh_HZ) gcPcZ_cH = self.cZP - np.conj(lcHcP*self.cHZ) admit_ZP_H = utils.admittance(gcPcZ_cH, gcPcP_cH) compl_ZP_H = k*admit_ZP_H coh_ZP_H = utils.coherence(gcPcZ_cH, gcPcP_cH, gcZcZ_cH) complyfunc.add('ZP-H', [compl_ZP_H, coh_ZP_H]) self.complyfunc = complyfunc
[docs] def save(self, filename, form='pkl'): """ Method to save the object to file using `~Pickle`. Parameters ---------- filename : str File name Examples -------- Following from the example outlined in method :func:`~obstools.comply.classes.Comply.calculate_compliance`, we simply save the final object >>> daycomply.save('daycomply_demo.pkl') Check that object has been saved >>> import glob >>> glob.glob("./daycomply_demo.pkl") ['./daycomply_demo.pkl'] """ if not self.complyfunc: print("Warning: saving before having calculated the compliance " + "and coherence functions") if form == 'pkl': # Remove traces to save disk space del self.c11 del self.c22 del self.cZZ del self.cPP del self.cHH del self.cHZ del self.cHP del self.c12 del self.c1Z del self.c1P del self.c2Z del self.c2P del self.cZP file = open(filename.parent / (filename.name + '.' + form), 'wb') pickle.dump(self, file) file.close() elif form == 'csv': import pandas as pd if 'ZP-H' in self.complyfunc: df = pd.DataFrame( {'Frequency': self.f, 'Compliance ZP': self.complyfunc['ZP'][0], 'Coherence ZP': self.complyfunc['ZP'][1], 'Compliance ZP-21': self.complyfunc['ZP-21'][0], 'Coherence ZP-21': self.complyfunc['ZP-21'][1], 'Compliance ZP-H': self.complyfunc['ZP-H'][0], 'Coherence ZP-H': self.complyfunc['ZP-H'][1]}) elif 'ZP-21' in self.complyfunc: df = pd.DataFrame( {'Frequency': self.f, 'Compliance ZP': self.complyfunc['ZP'][0], 'Coherence ZP': self.complyfunc['ZP'][1], 'Compliance ZP-21': self.complyfunc['ZP-21'][0], 'Coherence ZP-21': self.complyfunc['ZP-21'][1]}) else: df = pd.DataFrame( {'Frequency': self.f, 'Compliance ZP': self.complyfunc['ZP'][0], 'Coherence ZP': self.complyfunc['ZP'][1]}) df.to_csv(filename.parent / (filename.name + '.' + form), index=False) return