Source code for obstools.atacr.utils

# Copyright 2019 Pascal Audet & Helen Janiszewski
#
# This file is part of OBStools.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
:mod:`~obstools.atacr.utils` contains several functions that are used in the
class methods of `~obstools.atacr.classes`.

"""


import os
import re
import math
import numpy as np
import fnmatch
from stdb import StDbElement
from scipy.stats import circmean
from matplotlib import pyplot as plt
from obspy import read, Stream, Trace, UTCDateTime
from obspy.core import AttribDict


def get_stkeys(inventory, keys=None):

    allkeys = []
    station_list = inventory.get_contents()['stations']
    allkeys = [s.split(' ')[0] for s in station_list]

    if keys is not None:
        # Extract key subset

        stkeys = []
        for key in keys:
            # Convert the pattern to a regex pattern
            # Replace '.' with '\.' to match literal dots
            # Replace '*' with '.*' to match any sequence of characters
            # Replace '?' with '.' to match any single character
            pattern = key.replace('.', r'\.').replace('*', '.*').replace('?', '.')

            # Compile the regex pattern
            regex = re.compile(f'^.*{pattern}.*$')

            # Filter allkeys based on the compiled regex
            stkeys.extend([key for key in allkeys if regex.match(key)])

    else:
        stkeys = allkeys

    return stkeys


def inv2stdb(inventory, keys=None):

    stkeys = get_stkeys(inventory, keys)

    stations = {}
    for key in stkeys:
        net = key.split('.')[0]
        sta = key.split('.')[1]
        cha = '[CH]*'
        inv = inventory.select(network=net, station=sta, channel=cha)
        seed_id = inv.get_contents()['channels'][0]
        coords = inv.get_coordinates(seed_id)

        stdb_element = StDbElement(
            station=sta,
            network=net,
            channel=seed_id.split('.')[3][0:2],
            location=seed_id.split('.')[2],
            latitude=coords['latitude'],
            longitude=coords['longitude'],
            elevation=coords['elevation'],
            startdate=inv[0].stations[0].start_date,
            enddate=inv[0].stations[0].end_date
            )
        stations[key] = stdb_element

    return stations, stkeys


[docs] def traceshift(trace, tt): """ Function to shift traces in time given travel time Parameters ---------- trace : :class:`~obspy.core.Trace` object Trace object to update tt : float Time shift in seconds Returns ------- rtrace : :class:`~obspy.core.Trace` object Updated trace object """ # Define frequencies nt = trace.stats.npts dt = trace.stats.delta freq = np.fft.fftfreq(nt, d=dt) # Fourier transform ftrace = np.fft.fft(trace.data) # Shift for i in range(len(freq)): ftrace[i] = ftrace[i]*np.exp(-2.*np.pi*1j*freq[i]*tt) # Back Fourier transform and return as trace rtrace = trace.copy() rtrace.data = np.real(np.fft.ifft(ftrace)) # Update start time rtrace.stats.starttime -= tt return rtrace
[docs] def QC_streams(start, end, st): """ Function for quality control of traces, which compares the start and end times that were requested, as well as the total n length of the traces. Parameters ---------- start : :class:`~obspy.core.UTCDateTime` object Start time of requested stream end : :class:`~obspy.core.UTCDateTime` object End time of requested stream st : :class:`~obspy.core.Stream` object Stream object with all trace data Returns ------- (pass): bool Whether the QC test has passed st : :class:`~obspy.core.Stream` object Updated stream object """ # Check start times if not np.all([tr.stats.starttime == start for tr in st]): print("* Start times are not all close to true start: ") [print("* "+tr.stats.channel+" " + str(tr.stats.starttime)+" " + str(tr.stats.endtime)) for tr in st] print("* True start: "+str(start)) print("* -> Shifting traces to true start") delay = [tr.stats.starttime - start for tr in st] st_shifted = Stream( traces=[traceshift(tr, dt) for tr, dt in zip(st, delay)]) st = st_shifted.copy() # Try trimming dt = st[0].stats.delta try: st.trim(start, end-dt, fill_value=0., pad=True) except Exception: print("* Unable to trim") print("* -> Skipping") print("**************************************************") return False, None # Check final lengths - they should all be equal if start times # and sampling rates are all equal and traces have been trimmed sr = st[0].stats.sampling_rate if not np.allclose([tr.stats.npts for tr in st[1:]], st[0].stats.npts): print("* Lengths are incompatible: ") [print("* "+str(tr.stats.npts)) for tr in st] print("* -> Skipping") print("**************************************************") return False, None elif not np.allclose([st[0].stats.npts], int((end - start)*sr), atol=1): print("* Length is too short: ") print("* "+str(st[0].stats.npts) + " ~= "+str(int((end - start)*sr))) print("* -> Skipping") print("**************************************************") return False, None else: return True, st
[docs] def update_stats(tr, stla, stlo, stel, cha, evla=None, evlo=None): """ Function to include SAC metadata to :class:`~obspy.core.Trace` objects Parameters ---------- tr : :class:`~obspy.core.Trace` object Trace object to update stla : float Latitude of station stlo : float Longitude of station stel : float Station elevation (m) cha : str Channel for component evla : float, optional Latitude of event evlo : float, optional Longitute of event Returns ------- tr : :class:`~obspy.core.Trace` object Updated trace object """ tr.stats.sac = AttribDict() tr.stats.sac.stla = stla tr.stats.sac.stlo = stlo tr.stats.sac.stel = stel tr.stats.sac.kcmpnm = cha tr.stats.channel = cha if evla is not None and evlo is not None: tr.stats.sac.evla = evla tr.stats.sac.evlo = evlo return tr
[docs] def get_data(datapath, tstart, tend): """ Function to grab all available noise data given a path and data time range Parameters ---------- datapath : str Path to noise data folder tstart : :class:`~obspy.class.UTCDateTime` Start time for query tend : :class:`~obspy.class.UTCDateTime` End time for query Returns ------- tr1, tr2, trZ, trP : :class:`~obspy.core.Trace` object Corresponding trace objects for components H1, H2, HZ and HP. Returns empty traces for missing components. """ # Define empty streams trN1 = Stream() trN2 = Stream() trNZ = Stream() trNP = Stream() # Time iterator t1 = tstart # Cycle through each day within time range while t1 < tend: # Time stamp used in file name tstamp = str(t1.year).zfill(4)+'.'+str(t1.julday).zfill(3)+'.' # Cycle through directory and load files p = datapath.glob('*.*') files = [x for x in p if x.is_file()] for file in files: if fnmatch.fnmatch(str(file), '*' + tstamp + '*1.SAC'): tr = read(str(file)) trN1.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*2.SAC'): tr = read(str(file)) trN2.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*Z.SAC'): tr = read(str(file)) trNZ.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*H.SAC'): tr = read(str(file)) trNP.append(tr[0]) # Increase increment t1 += 3600.*24. # Fill with empty traces if components are not found ntr = len(trNZ) if not trN1 and not trN2: for i in range(ntr): trN1.append(Trace()) trN2.append(Trace()) if not trNP: for i in range(ntr): trNP.append(Trace()) if ntr > 0: # Check that all sampling rates are equal - otherwise resample if trNZ[0].stats.sampling_rate != trNP[0].stats.sampling_rate: # These checks assume that all seismic data have the same sampling if trNZ[0].stats.sampling_rate < trNP[0].stats.sampling_rate: trNP.resample(trNZ[0].stats.sampling_rate, no_filter=False) else: trNZ.resample(trNP[0].stats.sampling_rate, no_filter=False) if trN1: trN1.resample(trNP[0].stats.sampling_rate, no_filter=False) if trN2: trN2.resample(trNP[0].stats.sampling_rate, no_filter=False) return trN1, trN2, trNZ, trNP
[docs] def get_event(eventpath, tstart, tend): """ Function to grab all available earthquake data given a path and data time range Parameters ---------- eventpath : str Path to earthquake data folder tstart : :class:`~obspy.class.UTCDateTime` Start time for query tend : :class:`~obspy.class.UTCDateTime` End time for query Returns ------- tr1, tr2, trZ, trP : :class:`~obspy.core.Trace` object Corresponding trace objects for components H1, H2, HZ and HP. Returns empty traces for missing components. """ # Find out how many events from Z.SAC files eventfiles = list(eventpath.glob('*Z.SAC')) if not eventfiles: raise Exception("No event found in folder "+str(eventpath)) # Extract events from time stamps prefix = [file.name.split('.') for file in eventfiles] evstamp = [p[0]+'.'+p[1]+'.'+p[2]+'.'+p[3]+'.' for p in prefix] evDateTime = [UTCDateTime(p[0]+'-'+p[1]+'T'+p[2]+":"+p[3]) for p in prefix] # Define empty streams tr1 = Stream() tr2 = Stream() trZ = Stream() trP = Stream() # Cycle over all available files in time range for event, tstamp in zip(evDateTime, evstamp): if event >= tstart and event <= tend: # Cycle through directory and load files p = list(eventpath.glob('*.SAC')) files = [x for x in p if x.is_file()] for file in files: if fnmatch.fnmatch(str(file), '*' + tstamp + '*1.SAC'): tr = read(str(file)) tr1.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*2.SAC'): tr = read(str(file)) tr2.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*Z.SAC'): tr = read(str(file)) trZ.append(tr[0]) elif fnmatch.fnmatch(str(file), '*' + tstamp + '*H.SAC'): tr = read(str(file)) trP.append(tr[0]) # Fill with empty traces if components are not found ntr = len(trZ) if not tr1 and not tr2: for i in range(ntr): tr1.append(Trace()) tr2.append(Trace()) if not trP: for i in range(ntr): trP.append(Trace()) if ntr > 0: # Check that all sampling rates are equal - otherwise resample if trZ[0].stats.sampling_rate != trP[0].stats.sampling_rate: # These checks assume that all seismic data have the same sampling if trZ[0].stats.sampling_rate < trP[0].stats.sampling_rate: trP.resample(trZ[0].stats.sampling_rate, no_filter=False) else: trZ.resample(trP[0].stats.sampling_rate, no_filter=False) if tr1: tr1.resample(trP[0].stats.sampling_rate, no_filter=False) if tr2: tr2.resample(trP[0].stats.sampling_rate, no_filter=False) return tr1, tr2, trZ, trP
[docs] def calculate_tilt(ft1, ft2, ftZ, ftP, f, goodwins, tiltfreqs, fig_trf=False, savefig=None): """ Determines tilt orientation from the maximum coherence between rotated H1 and Z. Parameters ---------- ft1, ft2, ftZ, ftP : :class:`~numpy.ndarray` Fourier transform of corresponding H1, H2, HZ and HP components f : :class:`~numpy.ndarray` Frequency axis in Hz goodwins : list List of booleans representing whether a window is good (True) or not (False). This attribute is returned from the method :func:`~obstools.atacr.classes.DayNoise.QC_daily_spectra` tiltfreqs : list Two floats representing the frequency band at which the mean coherence, phase and admittance are calculated to determine the tile orientation Returns ------- cHH, cHZ, cHP : :class:`~numpy.ndarray` Arrays of power and cross-spectral density functions of components HH (rotated H1 in direction of maximum tilt), HZ, and HP coh : :class:`~numpy.ndarray` Mean coherence value between rotated H and Z components, as a function of azimuth CW from H1 ph : :class:`~numpy.ndarray` Mean phase value between rotated H and Z components, as a function of azimuth CW from H1 ad : :class:`~numpy.ndarray` Mean admittance value between rotated H and Z components, as a function of azimuth CW from H1 phi : :class:`~numpy.ndarray` Array of azimuths CW from H1 considered tilt_dir : float Tilt direction (azimuth CW from H1) at maximum coherence between rotated H1 and Z tilt_ang : float Tilt angle (down from HZ/3) at maximum coherence between rotated H1 and Z coh_value : float Mean coherence value at tilt direction phase_value : float Mean phase value at tilt direction admit_value : float Mean admittance value at tilt direction """ # frequencies considered in averaging freqs = (f > tiltfreqs[0]) & (f < tiltfreqs[1]) # Mean of Z PSD - this one doesn't change with angle from H1 cZZ = np.abs(np.mean(ftZ[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0))[0:len(f)] # Initialize arrays phi = np.arange(0., 360., 10.) coh = np.zeros(len(phi)) ph = np.zeros(len(phi)) ad = np.zeros(len(phi)) for i, d in enumerate(phi): # Rotate horizontals - clockwise from 1 # components 1, 2 correspond to y, x in the cartesian plane ftH = rotate_dir(ft2, ft1, d) # Get transfer functions cHH = np.abs(np.mean(ftH[goodwins, :] * np.conj(ftH[goodwins, :]), axis=0))[0:len(f)] cHZ = np.mean(ftH[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0)[0:len(f)] Co = coherence(cHZ, cHH, cZZ) Ad = admittance(cHZ, cHH) Ph = phase(cHZ/cHH) # Calculate coherence over frequency band try: coh[i] = np.mean(Co[freqs]) ad[i] = np.mean(Ad[freqs]) ph[i] = circmean( Ph[freqs], high=np.pi, low=-np.pi) except Exception: print('Exception') coh[i] = 0. ad[i] = 0. ph[i] = 0. # Index where coherence is max ind = np.argwhere(coh == coh.max()) # Phase, angle and direction at maximum coherence phase_value = ph[ind[0]][0] admit_value = ad[ind[0]][0] coh_value = coh[ind[0]][0] tilt_dir = phi[ind[0]][0] tilt_ang = np.arctan(admit_value)*180./np.pi # Phase has to be close to zero: ***Not true?*** # Check phase near max coherence and 180 deg apart start = max(0, ind[0][0] - 1) end = min(len(ph), ind[0][0] + 2) phase_std = np.std(ph[start:end]) if np.abs(phase_value) > np.pi/2: tilt_dir += 180. if tilt_dir > 360.: tilt_dir -= 360. # Refine search rphi = np.arange(tilt_dir-10., tilt_dir+10., 1.) rcoh = np.zeros(len(rphi)) rad = np.zeros(len(rphi)) rph = np.zeros(len(rphi)) for i, d in enumerate(rphi): # Rotate horizontals - clockwise from 1 # components 1, 2 correspond to y, x in the cartesian plane ftH = rotate_dir(ft2, ft1, d) # Get transfer functions cHH = np.abs(np.mean(ftH[goodwins, :] * np.conj(ftH[goodwins, :]), axis=0))[0:len(f)] cHZ = np.mean(ftH[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0)[0:len(f)] Co = coherence(cHZ, cHH, cZZ) Ad = admittance(cHZ, cHH) Ph = phase(cHZ/cHH) # Calculate coherence over frequency band try: rcoh[i] = np.mean(Co[freqs]) rad[i] = np.mean(Ad[freqs]) rph[i] = circmean( Ph[freqs], high=np.pi, low=-np.pi) except Exception: print("* Warning: problems in the Tilt calculations. " + "Setting coherence and phase between Z and rotated H " + "to 0.") rcoh[i] = 0. rad[i] = 0. rph[i] = 0. # Index where coherence is max ind = np.argwhere(rcoh == rcoh.max()) # Phase and direction at maximum coherence phase_value = rph[ind[0]][0] admit_value = rad[ind[0]][0] coh_value = rcoh[ind[0]][0] tilt_dir = rphi[ind[0]][0] tilt_ang = np.arctan(admit_value)*180./np.pi # Now calculate spectra at tilt direction ftH = rotate_dir(ft2, ft1, tilt_dir) # Get transfer functions cHH = np.abs(np.mean(ftH[goodwins, :] * np.conj(ftH[goodwins, :]), axis=0))[0:len(f)] cHZ = np.mean(ftH[goodwins, :] * np.conj(ftZ[goodwins, :]), axis=0)[0:len(f)] if np.any(ftP): cHP = np.mean(ftH[goodwins, :] * np.conj(ftP[goodwins, :]), axis=0)[0:len(f)] else: cHP = None return cHH, cHZ, cHP, coh, ph, ad, phi, tilt_dir, tilt_ang, coh_value, phase_value, admit_value
[docs] def smooth(data, nd, axis=0): """ Function to smooth power spectral density functions from the convolution of a boxcar function with the PSD Parameters ---------- data : :class:`~numpy.ndarray` Real-valued array to smooth (PSD) nd : int Number of samples over which to smooth axis : int, optional axis over which to perform the smoothing Returns ------- filt : :class:`~numpy.ndarray`, optional Filtered data """ if np.any(data): if data.ndim > 1: filt = np.zeros(data.shape) for i in range(data.shape[::-1][axis]): if axis == 0: filt[:, i] = np.convolve( data[:, i], np.ones((nd,))/nd, mode='same') elif axis == 1: filt[i, :] = np.convolve( data[i, :], np.ones((nd,))/nd, mode='same') else: filt = np.convolve(data, np.ones((nd,))/nd, mode='same') return filt else: return None
[docs] def admittance(Gxy, Gxx): """ Calculates admittance between two components Parameters ---------- Gxy : :class:`~numpy.ndarray` Cross spectral density function of `x` and `y` Gxx : :class:`~numpy.ndarray` Power spectral density function of `x` Returns ------- : :class:`~numpy.ndarray`, optional Admittance between `x` and `y` """ if np.any(Gxy) and np.any(Gxx): return np.abs(Gxy)/Gxx else: return None
[docs] def coherence(Gxy, Gxx, Gyy): """ Calculates coherence between two components Parameters ---------- Gxy : :class:`~numpy.ndarray` Cross spectral density function of `x` and `y` Gxx : :class:`~numpy.ndarray` Power spectral density function of `x` Gyy : :class:`~numpy.ndarray` Power spectral density function of `y` Returns ------- : :class:`~numpy.ndarray`, optional Coherence between `x` and `y` """ if np.any(Gxy) and np.any(Gxx) and np.any(Gxx): return np.abs(Gxy)**2/(Gxx*Gyy) else: return None
[docs] def phase(Gxy): """ Calculates phase angle between real and imaginary components Parameters ---------- Gxy : :class:`~numpy.ndarray` Cross spectral density function of `x` and `y` Returns ------- : :class:`~numpy.ndarray`, optional Phase angle between `x` and `y` """ if np.any(Gxy): return np.angle(Gxy) else: return None
[docs] def rotate_dir(x, y, theta): """ Rotates (x, y) data clockwise by an angle theta. Returns the rotated component y. Parameters ---------- x : :class:`~numpy.ndarray` Array of values along the `x` coordinate y : :class:`~numpy.ndarray` Array of values along the `y` coordinate theta : :class:`~numpy.ndarray` Angle in degrees Returns ------- y_rotated: :class:`~numpy.ndarray` Rotated array of component `y` """ d = theta*np.pi/180. rot_mat = [[np.cos(d), np.sin(d)], [-np.sin(d), np.cos(d)]] vxy_rotated = np.tensordot(rot_mat, [x, y], axes=1) return vxy_rotated[1]
def ftest(res1, pars1, res2, pars2): from scipy.stats import f as f_dist N1 = len(res1) N2 = len(res2) dof1 = N1 - pars1 dof2 = N2 - pars2 Ea_1 = np.sum(res1**2) Ea_2 = np.sum(res2**2) Fobs = (Ea_1/dof1)/(Ea_2/dof2) P = 1. - (f_dist.cdf(Fobs, dof1, dof2) - f_dist.cdf(1./Fobs, dof1, dof2)) return P
[docs] def robust(array): """ Calculates robust quantities using the robust standard units. Parameters ---------- array : :class:`~numpy.ndarray` Array of values Returns ------- robust_array: :class:`~numpy.ndarray` Array with outliers removed outliers : :class:`~numpy.ndarray` Array of outlier values """ median_array = np.median(array) mad_array = 1.4826*np.median(np.abs(array - median_array)) if mad_array > 0.: rsu_array = (array - median_array)/mad_array robust_array = array[np.abs(rsu_array) < 2.] outliers = array[np.abs(rsu_array) >= 2.] else: robust_array = array outliers = None return robust_array, outliers