Source code for obstools.atacr.utils

# Copyright 2019 Pascal Audet & Helen Janiszewski
#
# This file is part of OBStools.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
:mod:`~obstools.atacr.utils` contains several functions that are used in the
class methods of `~obstools.atacr.classes`.

"""


import os
import math
import numpy as np
import fnmatch
from scipy.stats import circmean
from matplotlib import pyplot as plt
from obspy.core import read, Stream, Trace, AttribDict, UTCDateTime


[docs] def traceshift(trace, tt): """ Function to shift traces in time given travel time Parameters ---------- trace : :class:`~obspy.core.Trace` object Trace object to update tt : float Time shift in seconds Returns ------- rtrace : :class:`~obspy.core.Trace` object Updated trace object """ # Define frequencies nt = trace.stats.npts dt = trace.stats.delta freq = np.fft.fftfreq(nt, d=dt) # Fourier transform ftrace = np.fft.fft(trace.data) # Shift for i in range(len(freq)): ftrace[i] = ftrace[i]*np.exp(-2.*np.pi*1j*freq[i]*tt) # Back Fourier transform and return as trace rtrace = trace.copy() rtrace.data = np.real(np.fft.ifft(ftrace)) # Update start time rtrace.stats.starttime -= tt return rtrace
[docs] def QC_streams(start, end, st): """ Function for quality control of traces, which compares the start and end times that were requested, as well as the total n length of the traces. Parameters ---------- start : :class:`~obspy.core.UTCDateTime` object Start time of requested stream end : :class:`~obspy.core.UTCDateTime` object End time of requested stream st : :class:`~obspy.core.Stream` object Stream object with all trace data Returns ------- (pass): bool Whether the QC test has passed st : :class:`~obspy.core.Stream` object Updated stream object """ # Check start times if not np.all([tr.stats.starttime == start for tr in st]): print("* Start times are not all close to true start: ") [print("* "+tr.stats.channel+" " + str(tr.stats.starttime)+" " + str(tr.stats.endtime)) for tr in st] print("* True start: "+str(start)) print("* -> Shifting traces to true start") delay = [tr.stats.starttime - start for tr in st] st_shifted = Stream( traces=[traceshift(tr, dt) for tr, dt in zip(st, delay)]) st = st_shifted.copy() # Try trimming dt = st[0].stats.delta try: st.trim(start, end-dt, fill_value=0., pad=True) except Exception: print("* Unable to trim") print("* -> Skipping") print("**************************************************") return False, None # Check final lengths - they should all be equal if start times # and sampling rates are all equal and traces have been trimmed sr = st[0].stats.sampling_rate if not np.allclose([tr.stats.npts for tr in st[1:]], st[0].stats.npts): print("* Lengths are incompatible: ") [print("* "+str(tr.stats.npts)) for tr in st] print("* -> Skipping") print("**************************************************") return False, None elif not np.allclose([st[0].stats.npts], int((end - start)*sr), atol=1): print("* Length is too short: ") print("* "+str(st[0].stats.npts) + " ~= "+str(int((end - start)*sr))) print("* -> Skipping") print("**************************************************") return False, None else: return True, st
[docs] def update_stats(tr, stla, stlo, stel, cha, evla=None, evlo=None): """ Function to include SAC metadata to :class:`~obspy.core.Trace` objects Parameters ---------- tr : :class:`~obspy.core.Trace` object Trace object to update stla : float Latitude of station stlo : float Longitude of station stel : float Station elevation (m) cha : str Channel for component evla : float, optional Latitude of event evlo : float, optional Longitute of event Returns ------- tr : :class:`~obspy.core.Trace` object Updated trace object """ tr.stats.sac = AttribDict() tr.stats.sac.stla = stla tr.stats.sac.stlo = stlo tr.stats.sac.stel = stel tr.stats.sac.kcmpnm = cha tr.stats.channel = cha if evla is not None and evlo is not None: tr.stats.sac.evla = evla tr.stats.sac.evlo = evlo return tr
[docs] def get_data(datapath, tstart, tend): """ Function to grab all available noise data given a path and data time range Parameters ---------- datapath : str Path to noise data folder tstart : :class:`~obspy.class.UTCDateTime` Start time for query tend : :class:`~obspy.class.UTCDateTime` End time for query Returns ------- tr1, tr2, trZ, trP : :class:`~obspy.core.Trace` object Corresponding trace objects for components H1, H2, HZ and HP. Returns empty traces for missing components. """ # Define empty streams trN1 = Stream() trN2 = Stream() trNZ = Stream() trNP = Stream() # Time iterator t1 = tstart # Cycle through each day within time range while t1 < tend: # Time stamp used in file name tstamp = str(t1.year).zfill(4)+'.'+str(t1.julday).zfill(3)+'.' # Cycle through directory and load files p = datapath.glob('*.*') files = [x for x in p if x.is_file()] for file in files: if fnmatch.fnmatch(str(file), '*' + tstamp + '*1.SAC'): tr = read(str(file)) trN1.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*2.SAC'): tr = read(str(file)) trN2.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*Z.SAC'): tr = read(str(file)) trNZ.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*H.SAC'): tr = read(str(file)) trNP.append(tr[0]) # Increase increment t1 += 3600.*24. # Fill with empty traces if components are not found ntr = len(trNZ) if not trN1 and not trN2: for i in range(ntr): trN1.append(Trace()) trN2.append(Trace()) if not trNP: for i in range(ntr): trNP.append(Trace()) if ntr > 0: # Check that all sampling rates are equal - otherwise resample if trNZ[0].stats.sampling_rate != trNP[0].stats.sampling_rate: # These checks assume that all seismic data have the same sampling if trNZ[0].stats.sampling_rate < trNP[0].stats.sampling_rate: trNP.resample(trNZ[0].stats.sampling_rate, no_filter=False) else: trNZ.resample(trNP[0].stats.sampling_rate, no_filter=False) if trN1: trN1.resample(trNP[0].stats.sampling_rate, no_filter=False) if trN2: trN2.resample(trNP[0].stats.sampling_rate, no_filter=False) return trN1, trN2, trNZ, trNP
[docs] def get_event(eventpath, tstart, tend): """ Function to grab all available earthquake data given a path and data time range Parameters ---------- eventpath : str Path to earthquake data folder tstart : :class:`~obspy.class.UTCDateTime` Start time for query tend : :class:`~obspy.class.UTCDateTime` End time for query Returns ------- tr1, tr2, trZ, trP : :class:`~obspy.core.Trace` object Corresponding trace objects for components H1, H2, HZ and HP. Returns empty traces for missing components. """ # Find out how many events from Z.SAC files eventfiles = list(eventpath.glob('*Z.SAC')) if not eventfiles: raise Exception("No event found in folder "+str(eventpath)) # Extract events from time stamps prefix = [file.name.split('.') for file in eventfiles] evstamp = [p[0]+'.'+p[1]+'.'+p[2]+'.'+p[3]+'.' for p in prefix] evDateTime = [UTCDateTime(p[0]+'-'+p[1]+'T'+p[2]+":"+p[3]) for p in prefix] # Define empty streams tr1 = Stream() tr2 = Stream() trZ = Stream() trP = Stream() # Cycle over all available files in time range for event, tstamp in zip(evDateTime, evstamp): if event >= tstart and event <= tend: # Cycle through directory and load files p = list(eventpath.glob('*.SAC')) files = [x for x in p if x.is_file()] for file in files: if fnmatch.fnmatch(str(file), '*' + tstamp + '*1.SAC'): tr = read(str(file)) tr1.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*2.SAC'): tr = read(str(file)) tr2.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*Z.SAC'): tr = read(str(file)) trZ.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*H.SAC'): tr = read(str(file)) trP.append(tr[0]) # Fill with empty traces if components are not found ntr = len(trZ) if not tr1 and not tr2: for i in range(ntr): tr1.append(Trace()) tr2.append(Trace()) if not trP: for i in range(ntr): trP.append(Trace()) if ntr > 0: # Check that all sampling rates are equal - otherwise resample if trZ[0].stats.sampling_rate != trP[0].stats.sampling_rate: # These checks assume that all seismic data have the same sampling if trZ[0].stats.sampling_rate < trP[0].stats.sampling_rate: trP.resample(trZ[0].stats.sampling_rate, no_filter=False) else: trZ.resample(trP[0].stats.sampling_rate, no_filter=False) if tr1: tr1.resample(trP[0].stats.sampling_rate, no_filter=False) if tr2: tr2.resample(trP[0].stats.sampling_rate, no_filter=False) return tr1, tr2, trZ, trP
[docs] def calculate_tilt(ft1, ft2, ftZ, ftP, f, goodwins, tiltfreqs, fig_trf=False, savefig=None): """ Determines tilt orientation from the maximum coherence between rotated H1 and Z. Parameters ---------- ft1, ft2, ftZ, ftP : :class:`~numpy.ndarray` Fourier transform of corresponding H1, H2, HZ and HP components f : :class:`~numpy.ndarray` Frequency axis in Hz goodwins : list List of booleans representing whether a window is good (True) or not (False). This attribute is returned from the method :func:`~obstools.atacr.classes.DayNoise.QC_daily_spectra` tiltfreqs : list Two floats representing the frequency band at which the mean coherence, phase and admittance are calculated to determine the tile orientation Returns ------- cHH, cHZ, cHP : :class:`~numpy.ndarray` Arrays of power and cross-spectral density functions of components HH (rotated H1 in direction of maximum tilt), HZ, and HP coh : :class:`~numpy.ndarray` Mean coherence value between rotated H and Z components, as a function of azimuth CW from H1 ph : :class:`~numpy.ndarray` Mean phase value between rotated H and Z components, as a function of azimuth CW from H1 ad : :class:`~numpy.ndarray` Mean admittance value between rotated H and Z components, as a function of azimuth CW from H1 phi : :class:`~numpy.ndarray` Array of azimuths CW from H1 considered tilt_dir : float Tilt direction (azimuth CW from H1) at maximum coherence between rotated H1 and Z tilt_ang : float Tilt angle (down from HZ/3) at maximum coherence between rotated H1 and Z coh_value : float Mean coherence value at tilt direction phase_value : float Mean phase value at tilt direction admit_value : float Mean admittance value at tilt direction """ # frequencies considered in averaging freqs = (f > tiltfreqs[0]) & (f < tiltfreqs[1]) # Mean of Z PSD - this one doesn't change with angle from H1 cZZ = np.abs(np.mean(ftZ[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0))[0:len(f)] # Initialize arrays phi = np.arange(0., 360., 10.) coh = np.zeros(len(phi)) ph = np.zeros(len(phi)) ad = np.zeros(len(phi)) for i, d in enumerate(phi): # Rotate horizontals - clockwise from 1 # components 1, 2 correspond to y, x in the cartesian plane ftH = rotate_dir(ft2, ft1, d) # Get transfer functions cHH = np.abs(np.mean(ftH[goodwins, :] * np.conj(ftH[goodwins, :]), axis=0))[0:len(f)] cHZ = np.mean(ftH[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0)[0:len(f)] Co = coherence(cHZ, cHH, cZZ) Ad = admittance(cHZ, cHH) Ph = phase(cHZ/cHH) # Calculate coherence over frequency band try: coh[i] = np.mean(Co[freqs]) ad[i] = np.mean(Ad[freqs]) ph[i] = circmean( Ph[freqs], high=np.pi, low=-np.pi) except Exception: print('Exception') coh[i] = 0. ad[i] = 0. ph[i] = 0. # Index where coherence is max ind = np.argwhere(coh == coh.max()) # Phase, angle and direction at maximum coherence phase_value = ph[ind[0]][0] admit_value = ad[ind[0]][0] coh_value = coh[ind[0]][0] tilt_dir = phi[ind[0]][0] tilt_ang = np.arctan(admit_value)*180./np.pi # Phase has to be close to zero: ***Not true?*** # Check phase near max coherence and 180 deg apart start = max(0, ind[0][0] - 1) end = min(len(ph), ind[0][0] + 2) phase_std = np.std(ph[start:end]) if np.abs(phase_value) > np.pi/2: tilt_dir += 180. if tilt_dir > 360.: tilt_dir -= 360. # Refine search rphi = np.arange(tilt_dir-10., tilt_dir+10., 1.) rcoh = np.zeros(len(rphi)) rad = np.zeros(len(rphi)) rph = np.zeros(len(rphi)) for i, d in enumerate(rphi): # Rotate horizontals - clockwise from 1 # components 1, 2 correspond to y, x in the cartesian plane ftH = rotate_dir(ft2, ft1, d) # Get transfer functions cHH = np.abs(np.mean(ftH[goodwins, :] * np.conj(ftH[goodwins, :]), axis=0))[0:len(f)] cHZ = np.mean(ftH[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0)[0:len(f)] Co = coherence(cHZ, cHH, cZZ) Ad = admittance(cHZ, cHH) Ph = phase(cHZ/cHH) # Calculate coherence over frequency band try: rcoh[i] = np.mean(Co[freqs]) rad[i] = np.mean(Ad[freqs]) rph[i] = circmean( Ph[freqs], high=np.pi, low=-np.pi) except Exception: print("* Warning: problems in the Tilt calculations. " + "Setting coherence and phase between Z and rotated H " + "to 0.") rcoh[i] = 0. rad[i] = 0. rph[i] = 0. # Index where coherence is max ind = np.argwhere(rcoh == rcoh.max()) # Phase and direction at maximum coherence phase_value = rph[ind[0]][0] admit_value = rad[ind[0]][0] coh_value = rcoh[ind[0]][0] tilt_dir = rphi[ind[0]][0] tilt_ang = np.arctan(admit_value)*180./np.pi # Now calculate spectra at tilt direction ftH = rotate_dir(ft2, ft1, tilt_dir) # Get transfer functions cHH = np.abs(np.mean(ftH[goodwins, :] * np.conj(ftH[goodwins, :]), axis=0))[0:len(f)] cHZ = np.mean(ftH[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0)[0:len(f)] if np.any(ftP): cHP = np.mean(ftH[goodwins, :] * np.conj(ftP[goodwins, :]), axis=0)[0:len(f)] else: cHP = None return cHH, cHZ, cHP, coh, ph, ad, phi, tilt_dir, tilt_ang, coh_value, phase_value, admit_value
[docs] def smooth(data, nd, axis=0): """ Function to smooth power spectral density functions from the convolution of a boxcar function with the PSD Parameters ---------- data : :class:`~numpy.ndarray` Real-valued array to smooth (PSD) nd : int Number of samples over which to smooth axis : int, optional axis over which to perform the smoothing Returns ------- filt : :class:`~numpy.ndarray`, optional Filtered data """ if np.any(data): if data.ndim > 1: filt = np.zeros(data.shape) for i in range(data.shape[::-1][axis]): if axis == 0: filt[:, i] = np.convolve( data[:, i], np.ones((nd,))/nd, mode='same') elif axis == 1: filt[i, :] = np.convolve( data[i, :], np.ones((nd,))/nd, mode='same') else: filt = np.convolve(data, np.ones((nd,))/nd, mode='same') return filt else: return None
[docs] def admittance(Gxy, Gxx): """ Calculates admittance between two components Parameters ---------- Gxy : :class:`~numpy.ndarray` Cross spectral density function of `x` and `y` Gxx : :class:`~numpy.ndarray` Power spectral density function of `x` Returns ------- : :class:`~numpy.ndarray`, optional Admittance between `x` and `y` """ if np.any(Gxy) and np.any(Gxx): return np.abs(Gxy)/Gxx else: return None
[docs] def coherence(Gxy, Gxx, Gyy): """ Calculates coherence between two components Parameters ---------- Gxy : :class:`~numpy.ndarray` Cross spectral density function of `x` and `y` Gxx : :class:`~numpy.ndarray` Power spectral density function of `x` Gyy : :class:`~numpy.ndarray` Power spectral density function of `y` Returns ------- : :class:`~numpy.ndarray`, optional Coherence between `x` and `y` """ if np.any(Gxy) and np.any(Gxx) and np.any(Gxx): return np.abs(Gxy)**2/(Gxx*Gyy) else: return None
[docs] def phase(Gxy): """ Calculates phase angle between real and imaginary components Parameters ---------- Gxy : :class:`~numpy.ndarray` Cross spectral density function of `x` and `y` Returns ------- : :class:`~numpy.ndarray`, optional Phase angle between `x` and `y` """ if np.any(Gxy): return np.angle(Gxy) else: return None
def rotate_dir(x, y, theta): d = theta*np.pi/180. rot_mat = [[np.cos(d), np.sin(d)], [-np.sin(d), np.cos(d)]] vxy_rotated = np.tensordot(rot_mat, [x, y], axes=1) return vxy_rotated[1] def ftest(res1, pars1, res2, pars2): from scipy.stats import f as f_dist N1 = len(res1) N2 = len(res2) dof1 = N1 - pars1 dof2 = N2 - pars2 Ea_1 = np.sum(res1**2) Ea_2 = np.sum(res2**2) Fobs = (Ea_1/dof1)/(Ea_2/dof2) P = 1. - (f_dist.cdf(Fobs, dof1, dof2) - f_dist.cdf(1./Fobs, dof1, dof2)) return P