Skip to content

Commit

Permalink
Finalizing implementation of subset-stacking
Browse files Browse the repository at this point in the history
  • Loading branch information
geojunky committed Sep 30, 2024
1 parent 15c5e04 commit cbece11
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 35 deletions.
13 changes: 9 additions & 4 deletions seismic/xcorqc/correlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from seismic.xcorqc.xcorqc import IntervalStackXCorr
from seismic.xcorqc.utils import getStationInventory, read_location_preferences, Dataset
from seismic.xcorqc.subset_stacker import SubsetStacker
from seismic.misc import get_git_revision_hash, rtp2xyz, split_list
from seismic.misc_p import ProgressTracker
from itertools import product
Expand All @@ -45,7 +46,7 @@ def process(data_source1, data_source2, output_path,
clip_to_2std=False, whitening=False, whitening_window_frequency=0,
one_bit_normalize=False, location_preferences=None, ds1_zchan=None, ds1_nchan=None,
ds1_echan=None, ds2_zchan=None, ds2_nchan=None, ds2_echan=None, corr_chan=None,
envelope_normalize=False, ensemble_stack=False, subset_stack=False, apply_simple_stacking=True,
envelope_normalize=False, ensemble_stack=False, subset_stacker=None, apply_simple_stacking=True,
restart=False, dry_run=False, no_tracking_tag=False, scratch_folder=None):
"""
:param data_source1: Text file containing paths to ASDF files
Expand Down Expand Up @@ -119,7 +120,7 @@ def outputConfigParameters():
if(whitening):
f.write('%35s\t\t\t: %s\n' % ('--whitening-window-frequency', whitening_window_frequency))
f.write('%35s\t\t\t: %s\n' % ('--ensemble-stack', ensemble_stack))
f.write('%35s\t\t\t: %s\n' % ('--subset-stack', subset_stack))
f.write('%35s\t\t\t: %s\n' % ('--subset-stack', subset_stacker is not None))
f.write('%35s\t\t\t: %s\n' % ('--restart', 'TRUE' if restart else 'FALSE'))
f.write('%35s\t\t\t: %s\n' % ('--no-tracking-tag', 'TRUE' if no_tracking_tag else 'FALSE'))
f.write('%35s\t\t\t: %s\n' % ('--scratch-folder', scratch_folder))
Expand Down Expand Up @@ -309,7 +310,7 @@ def get_loccha(cha1, cha2):
interval_seconds, window_seconds, window_overlap,
window_buffer_length, fmin, fmax, clip_to_2std, whitening,
whitening_window_frequency, one_bit_normalize, envelope_normalize,
ensemble_stack, subset_stack, apply_simple_stacking, output_path, 2,
ensemble_stack, subset_stacker, apply_simple_stacking, output_path, 2,
time_tag, scratch_folder, git_hash)
# end for
# end for
Expand Down Expand Up @@ -528,13 +529,17 @@ def main(data_source1, data_source2, output_path, window_seconds, window_overlap
apply_simple_stacking = True
# end if

# instantiate subset-stacker if requested
subset_stacker = None
if(subset_stack): subset_stacker = SubsetStacker()

process(data_source1, data_source2, output_path, interval_seconds, window_seconds, window_overlap,
window_buffer_length, read_ahead_window_seconds, resample_rate, taper_length, nearest_neighbours,
pair_min_dist, pair_max_dist, fmin, fmax, station_names1, station_names2, pairs_to_compute,
start_time, end_time, instrument_response_inventory, instrument_response_output, water_level,
clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, location_preferences,
ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan, ds2_nchan, ds2_echan, corr_chan, envelope_normalize,
ensemble_stack, subset_stack, apply_simple_stacking, restart, dry_run, no_tracking_tag,
ensemble_stack, subset_stacker, apply_simple_stacking, restart, dry_run, no_tracking_tag,
scratch_folder)
# end func

Expand Down
17 changes: 16 additions & 1 deletion seismic/xcorqc/subset_stacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def get_affected_indices(source_eids, pat, swat, swet):
p2 = [slon2, slat2]
az, baz, dist = self.gc.geod.inv(p1[0], p1[1], p2[0], p2[1])

#print(az, baz)
eaz1, ebaz1, edist1 = self.gc.geod.inv(np.ones(len(cat)) * p1[0],
np.ones(len(cat)) * p1[1],
cat['lon'], cat['lat'])
Expand All @@ -154,7 +155,6 @@ def get_affected_indices(source_eids, pat, swat, swet):
# compute P and SW arrival times for relevant events at the two stations
edepth_km = np.array(cat['dep'])
otime = np.array(cat['EventOrigintim'])
mag = np.array(cat['MwG'])

ptt1 = self.tti.get_tt('P', edistdeg1, edepth_km)
ptt2 = self.tti.get_tt('P', edistdeg2, edepth_km)
Expand Down Expand Up @@ -196,17 +196,24 @@ def get_affected_indices(source_eids, pat, swat, swet):
ccids2_inside_az = get_affected_indices(eids2_inside_az, pat2, swat2, swet2)
ccids_inside_az = ccids1_inside_az | ccids2_inside_az

#print('ccs inside azimuth for station 1: {}'.format(np.sum(ccids1_inside_az)))
#print('ccs inside azimuth for station 2: {}'.format(np.sum(ccids2_inside_az)))

ccids1_outside_az = get_affected_indices(eids_outside_az, pat1, swat1, swet1)
ccids2_outside_az = get_affected_indices(eids_outside_az, pat2, swat2, swet2)
ccids_outside_az = ccids1_outside_az | ccids2_outside_az

#print('ccs outside azimuth for station 1: {}'.format(np.sum(ccids1_outside_az)))
#print('ccs outside azimuth for station 2: {}'.format(np.sum(ccids2_outside_az)))

# aliases to indices as named in the manuscript
idsXei = ccids_inside_az # inside az
idsXec = ~(ccids_inside_az | ccids_outside_az) # no events
idsXeiUXec = idsXei | idsXec # no events outside az range
idsXeo = ccids_outside_az

# compute means
mean = mean_Xei = mean_Xec = mean_XeiUXec = mean_Xeo = None
mean = np.zeros(spooled_matrix.ncols)
mean_Xei = np.zeros(spooled_matrix.ncols)
mean_Xec = np.zeros(spooled_matrix.ncols)
Expand All @@ -229,6 +236,14 @@ def get_affected_indices(source_eids, pat, swat, swet):
mean_XeiUXec /= float(np.sum(idsXeiUXec))
mean_Xeo /= float(np.sum(idsXeo))

"""
np.savez('stack3outputs.npz', xcf=mean,
xcf1=mean_Xei, xcf2=mean_Xec, xcf3=mean_XeiUXec,
xcf4=mean_Xeo, idsXei=idsXei, idsXec=idsXec,
idsXeiUXec=idsXeiUXec, idsXeo=idsXeo)
"""
# end if

return mean, mean_Xei, mean_Xec, mean_XeiUXec, mean_Xeo
# end func
# end class
Expand Down
38 changes: 37 additions & 1 deletion seismic/xcorqc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from seismic.misc import rtp2xyz, read_key_value_pairs
from seismic.misc import get_git_revision_hash, rtp2xyz, split_list
import os, psutil
from netCDF4 import Dataset as ncDataset

class Dataset:
def __init__(self, asdf_file_name, netsta_list='*'):
Expand Down Expand Up @@ -180,8 +181,11 @@ def read_subset_stacker_config()->dict:
fn = os.path.join(os.getcwd(), 'subset_stack.conf')
try:
d = read_key_value_pairs(fn, keys, strict=True)
for k in keys[1:]:
d[k] = float(d[k])
# end for
except Exception as e:
print(e)
print('Reading {} failed with error: {}'.format(fn, e))
# end try

return d
Expand Down Expand Up @@ -387,6 +391,38 @@ def __init__(self, ncols, dtype=np.float32, max_size_mb=2048, prefix='', dir=Non
self._file = SpooledTemporaryFile(prefix = self._prefix, mode = 'w+b', max_size = max_size_mb * 1024**2, dir=dir)
# end func

@classmethod
def from_nc(cls, nc_file):
"""
Alternative instantiation for testing purposes
@param nc_file:
@return:
"""
try:
ds = ncDataset(nc_file)
xcorr = np.array(ds.variables['xcorr'])
shp = xcorr.shape
ncols = 0

if(len(shp) == 1): ncols = shp[0]
elif(len(shp) == 2): ncols = shp[1]

sm = cls(ncols, dtype=xcorr.dtype)

if(len(shp) == 1):
sm.write_row(xcorr)
elif(len(shp) == 2):
for i in np.arange(shp[0]):
sm.write_row(xcorr[i, :])
# end for
# end if

return sm
except Exception as e:
print('Failed to load {} with error: {}'.format(nc_file, str(e)))
# end try
# end func

@property
def ncols(self):
return self._ncols
Expand Down
61 changes: 37 additions & 24 deletions seismic/xcorqc/xcorqc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from seismic.xcorqc.fft import *
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
from seismic.xcorqc.utils import get_stream, fill_gaps, MemoryTracker
from seismic.xcorqc.subset_stacker import SubsetStacker
from netCDF4 import Dataset
from functools import reduce
from seismic.xcorqc.utils import SpooledMatrix
Expand Down Expand Up @@ -522,7 +523,7 @@ def IntervalStackXCorr(refds, tempds,
clip_to_2std=False, whitening=False, whitening_window_frequency=0,
one_bit_normalize=False, envelope_normalize=False,
ensemble_stack=False,
subset_stack=False,
subset_stacker: SubsetStacker=None,
apply_simple_stacking=True,
outputPath='/tmp', verbose=1, tracking_tag='',
scratch_folder=None, git_hash=''):
Expand Down Expand Up @@ -596,9 +597,9 @@ def IntervalStackXCorr(refds, tempds,
:param envelope_normalize: Envelope via Hilbert transforms and normalize
:type ensemble_stack: bool
:param ensemble_stack: Outputs a single CC function stacked over all data for a given station-pair
:type subset_stack: bool
:param subset_stack: Outputs stacks over subsets of CC windows, identified based on the presence/absence
of azimuthal earthquake energy
:type subset_stacker: SubsetStacker
:param subset_stacker: Custom stacker to stack over subsets of CC windows, identified based on the
presence/absence of azimuthal earthquake energy
:type apply_simple_stacking: bool
:param apply_simple_stacking: stacks cross-correlation windows over intervals
:type outputPath: str
Expand Down Expand Up @@ -887,46 +888,58 @@ def IntervalStackXCorr(refds, tempds,
else:
xc[:] = es
# end if
elif subset_stack:
pass
else:
root_grp.createDimension('interval', flattenedIntervalStartTimes.shape[0])
root_grp.createDimension('window', flattenedWindowStartTimes.shape[0])

# Variables
# create and polulate variables
interval = root_grp.createVariable('interval', 'f4', ('interval',))
ist = root_grp.createVariable('IntervalStartTimes', 'i8', ('interval',))
iet = root_grp.createVariable('IntervalEndTimes', 'i8', ('interval',))
wst = root_grp.createVariable('WindowStartTimes', 'i8', ('window',))
wet = root_grp.createVariable('WindowEndTimes', 'i8', ('window',))

xc = None
nsw = None
interval[:] = np.arange(flattenedIntervalStartTimes.shape[0])
ist[:] = flattenedIntervalStartTimes
iet[:] = flattenedIntervalEndTimes
wst[:] = flattenedWindowStartTimes
wet[:] = flattenedWindowEndTimes

if(apply_simple_stacking):
nsw = root_grp.createVariable('NumStackedWindows', 'f4', ('interval',))
xc = root_grp.createVariable('xcorr', 'f4', ('interval', 'lag',),
chunksizes=(1, spooledXcorr.ncols),
zlib=True)
nsw[:] = flattenedWindowCounts
for irow in np.arange(spooledXcorr.nrows):
xc[irow, :] = spooledXcorr.read_row(irow)
# end for
elif subset_stacker is not None:
xc = root_grp.createVariable('xcorr', 'f4', ('lag',))
xc_Xei = root_grp.createVariable('xcorr_Xei', 'f4', ('lag',))
xc_Xec = root_grp.createVariable('xcorr_Xec', 'f4', ('lag',))
xc_XeiUXec = root_grp.createVariable('xcorr_XeiUXec', 'f4', ('lag',))
xc_Xeo = root_grp.createVariable('xcorr_Xeo', 'f4', ('lag',))

slon1, slat1 = refds.unique_coordinates[ref_net_sta]
slon2, slat2 = tempds.unique_coordinates[temp_net_sta]
mean, mean_Xei, mean_Xec, mean_XeiUXec, mean_Xeo = \
subset_stacker.stack(spooledXcorr, flattenedWindowStartTimes,
flattenedWindowEndTimes,
slon1, slat1, slon2, slat2)
xc[:] = mean
xc_Xei[:] = mean_Xei
xc_Xec[:] = mean_Xec
xc_XeiUXec[:] = mean_XeiUXec
xc_Xeo[:] = mean_Xeo
else:
xc = root_grp.createVariable('xcorr', 'f4', ('window', 'lag',),
chunksizes=(1, spooledXcorr.ncols),
zlib=True)
for irow in np.arange(spooledXcorr.nrows):
xc[irow, :] = spooledXcorr.read_row(irow)
# end for
# end if

# Populate variables
if(apply_simple_stacking):
nsw[:] = flattenedWindowCounts
# end if

interval[:] = np.arange(flattenedIntervalStartTimes.shape[0])
ist[:] = flattenedIntervalStartTimes
iet[:] = flattenedIntervalEndTimes
wst[:] = flattenedWindowStartTimes
wet[:] = flattenedWindowEndTimes

for irow in np.arange(spooledXcorr.nrows):
xc[irow, :] = spooledXcorr.read_row(irow)
# end for
# end if

lag[:] = x
Expand Down
6 changes: 3 additions & 3 deletions tests/test_seismic/xcorqc/test_correlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@

expected_folder = '%s/data/expected/'%(path)
output_folder = str(tempfile.mkdtemp())
#output_folder = '/tmp'
os.mkdir(os.path.join(output_folder, 'stacked'))
os.mkdir(os.path.join(output_folder, 'unstacked'))
#output_folder = '/tmp'

def test_correlator():
start_time = '2006-11-03T00:00:00'
Expand All @@ -76,7 +76,7 @@ def test_correlator():
netsta2, None, start_time, end_time, None, 'vel',
50, False, True, 0.02, True, loc_pref,
'*Z', '*N', '*E', '*Z', '*N', '*E', 'z', False, False,
True, False, False, True, None)
None, True, False, False, True, None)


# Read result
Expand Down Expand Up @@ -109,7 +109,7 @@ def test_correlator():
netsta2, None, start_time, end_time, None, 'vel',
50, False, True, 0.02, True, loc_pref,
'*Z', '*N', '*E', '*Z', '*N', '*E', 'z', False, False,
False, False, False, True, None)
None, False, False, False, True, None)


# Read result
Expand Down
4 changes: 2 additions & 2 deletions tests/test_seismic/xcorqc/test_xcorr.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def expected_lines_iter():
interval_seconds=isec,
window_overlap=olap,
window_buffer_length=wbl,
apply_stacking=True)
apply_simple_stacking=True)

olines.append("============[test: {}]============\n".format(i))
olines.append('Params: raw: {}, wsec: {}, olap: {}, wbl: {}\n'.format(raw, wsec, olap, wbl))
Expand Down Expand Up @@ -260,7 +260,7 @@ def expected_lines_iter():
interval_seconds=isec,
window_overlap=olap,
window_buffer_length=wbl,
apply_stacking=True)
apply_simple_stacking=True)

olines.append("============[test: {}]============\n".format(i))
olines.append('Params: wsec: {}, isec: {}, olap: {}, wbl: {}\n'.format(wsec, isec, olap, wbl))
Expand Down

0 comments on commit cbece11

Please sign in to comment.