Skip to content

Commit

Permalink
Schema changes in FederatedASDFDataSet
Browse files Browse the repository at this point in the history
* Simplified and optimized queries to improve performance
* Added an ancillary table to improve performance of the GUI viewer
  • Loading branch information
geojunky committed Oct 15, 2024
1 parent 39b7588 commit cfdcd7d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 34 deletions.
10 changes: 10 additions & 0 deletions seismic/ASDFdatabase/FederatedASDFDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ def get_global_time_range(self, network, station=None, location=None, channel=No

# end func

def get_nslc_list(self):
"""
Get a list of all net, sta, loc, cha combinations featured in the database
@return:
"""

results = self.fds.get_nslc_list()
return results
# end if

def get_stations(self, starttime, endtime, network=None, station=None, location=None, channel=None):
"""
:param starttime: start time string in UTCDateTime format; can also be an instance of obspy.UTCDateTime
Expand Down
65 changes: 41 additions & 24 deletions seismic/ASDFdatabase/FederatedASDFViewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
from shapely import geometry
from seismic.misc import print_exception

NULL_CHANNEL_CODE = 'XXX'
NULL_AVAILABILITY = 'Availability: 2100-01-01 - 1900-01-01'

class CustomPPSD(PPSD):
def __init__(self, stats, skip_on_gaps=False,
db_bins=(-150, 50, 1.), ppsd_length=3600.0, overlap=0.5,
Expand Down Expand Up @@ -194,35 +197,26 @@ def __int__(self, *args, **kwargs):
# end func

def getNetworks(self):
result = set([item.split('.')[0] for item in self.fds.unique_coordinates.keys()])
result = self.nslc_dict.keys()
return sorted(list(result))
# end func

def getStations(self, net):
result = set()
for item in self.fds.unique_coordinates.keys():
nc, sc = item.split('.')
if(nc == net): result.add(sc)
# end for
result = self.nslc_dict[net].keys()
return sorted(list(result))
# end func

def getLocations(self, net, sta):
rows = self.fds.get_stations(UTCDateTime('1900-01-01'), UTCDateTime('2100-01-01'),
network=net, station=sta)
locs = set()
for row in rows: locs.add(row[2])

return sorted(list(locs))
result = self.nslc_dict[net][sta].keys()
return sorted(list(result))
# end func

def getChannels(self, net, sta, loc):
rows = self.fds.get_stations(UTCDateTime('1900-01-01'), UTCDateTime('2100-01-01'),
network=net, station=sta, location=loc)
comps = set()
for row in rows: comps.add(row[3])

return sorted(list(comps))
result = self.nslc_dict[net][sta][loc]
# add a null entry to users get a chance to select the channel
# which channel they want to see data for
result.append(NULL_CHANNEL_CODE)
return sorted(list(result))
# end func

def mapWidget(self):
Expand Down Expand Up @@ -497,15 +491,23 @@ def locChanged(emitter, value):

def getMeta(nc, sc, lc, cc):
lon, lat = self.fds.unique_coordinates['{}.{}'.format(nc, sc)]
st, et = self.fds.get_global_time_range(nc, sc, lc, cc)
#print(nc, sc, lc, cc, st, et)
locStr = "Lon: {:.2f}, Lat: {:.2f}".format(lon, lat)
availStr = "Availability: {} - {}".format(st.strftime('%Y-%m-%d'), et.strftime('%Y-%m-%d'))

availStr = None
if(cc == NULL_CHANNEL_CODE):
availStr = NULL_AVAILABILITY
else:
st, et = self.fds.get_global_time_range(nc, sc, lc, cc)
availStr = "Availability: {} - {}".format(st.strftime('%Y-%m-%d'), et.strftime('%Y-%m-%d'))
# end if

return locStr, availStr
# end func

def setTraceImage(nc, sc, lc, cc, st=None, et=None):
if(cc == NULL_CHANNEL_CODE): return # nothing to do for null channel-code

try:
if(st is None and et is None):
st, et = self.fds.get_global_time_range(nc, sc, lc, cc)
Expand All @@ -521,7 +523,7 @@ def setTraceImage(nc, sc, lc, cc, st=None, et=None):

fig = Figure(figsize=(TRACE_FIG_WIDTH, TRACE_FIG_HEIGHT))
stream = self.fds.get_waveforms(nc, sc, lc, cc, st, st + step)

if(len(stream)):
if(not isPPSD):
fig = stream.plot(fig=fig, handle=True, type='relative')
Expand Down Expand Up @@ -555,19 +557,23 @@ def chaChanged(emitter, value):
lc = self.rowContainer.children[key].children['leftContainer'].children['nslcBox'].children['loc'].get_value()
cc = self.rowContainer.children[key].children['leftContainer'].children['nslcBox'].children['cha'].get_value()

#print('chaChanged')

# update metadata
locStr, availStr = getMeta(nc, sc, lc, cc)
self.rowContainer.children[key].children['leftContainer'].\
children['locLabel'].set_text(locStr)
self.rowContainer.children[key].children['leftContainer'].\
children['availLabel'].set_text(availStr)

if(cc == NULL_CHANNEL_CODE): return

# update plot
self.rowContainer.children[key].children['rightContainer'].set_enabled(True)
self.rowContainer.children[key].children['rightContainer'].children['plot'] = gui.Label('Loading..')
t = threading.Thread(target=setTraceImage,
args=(nc, sc, lc, cc))
t.start()
#print('chaChanged')
# end func

def startStepChanged(emitter, value=None):
Expand Down Expand Up @@ -600,6 +606,7 @@ def startStepChanged(emitter, value=None):
# end if

# update plot
self.rowContainer.children[key].children['rightContainer'].set_enabled(True)
self.rowContainer.children[key].children['rightContainer'].children['plot'] = gui.Label('Loading..')
t = threading.Thread(target=setTraceImage,
args=(nc, sc, lc, cc, st, et))
Expand Down Expand Up @@ -691,7 +698,8 @@ def removeWidget(emitter):
ppsd.onchange.do(startStepChanged)

rightContainer.append({'startStepLabelBox': startStepLabelBox, 'startStepBox': startStepBox,
'plot': gui.Label('Loading..')})
'plot': gui.Label('')})
rightContainer.set_enabled(False)

t = threading.Thread(target=setTraceImage,
args=(net.get_value(), sta.get_value(), loc.get_value(), cha.get_value()))
Expand All @@ -715,6 +723,15 @@ def addWidget(emitter):
# end func

self.fds = fds
# populate net, sta, loc, cha dict
self.nslc_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))

nslc_list = self.fds.get_nslc_list()
for row in nslc_list:
net, sta, loc, cha = row
self.nslc_dict[net][sta][loc].append(cha)
# end for

# create master container
self.rowContainer = gui.VBox(width=ROW_WIDGET_WIDTH * PADDING_FACTOR,
height=ROW_WIDGET_HEIGHT * PADDING_FACTOR,
Expand Down Expand Up @@ -786,7 +803,7 @@ def process(asdf_source):

# starts the webserver
start(DataViewer, address='0.0.0.0', port=1122, start_browser=False,
update_interval=0.1, userdata=(fds,))
update_interval=0, userdata=(fds,))
# end func

if (__name__ == '__main__'):
Expand Down
31 changes: 21 additions & 10 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,15 @@ def decode_tag(tag, type='raw_recording'):
# end for

if(self.rank==0):
print('Creating table indices..')
self.conn = sqlite3.connect(self.db_fn,
check_same_thread=self.single_threaded_access)
# create a convenience table with all combinations of net, sta, loc, cha
self.conn.execute('create table nslc as select net, sta, loc, cha, min(st) as st, max(et) as et from wtag group by net, sta, loc, cha')

print('Creating table indices..')
self.conn.execute('create index allindex on wtag(ds_id, net, sta, loc, cha, st, et)')
self.conn.execute('create index metaindex on meta(ds_id, net, sta)')
self.conn.execute('create index nslcindex on nslc(net, sta, loc, cha, st, et)')
self.conn.commit()
self.conn.close()
print('Done..')
Expand All @@ -464,33 +468,40 @@ def decode_tag(tag, type='raw_recording'):
# end func

def get_global_time_range(self, network, station=None, location=None, channel=None):
query = "select min(st), max(et) from wtag where net='%s' "%(network)
query = "select min(st), max(et) from nslc where net='%s' " % (network)

if (station is not None):
query += "and sta='%s' "%(station)
query += "and sta='%s' " % (station)
if (location is not None):
query += "and loc='%s' "%(location)
query += "and loc='%s' " % (location)
if (channel is not None):
query += "and cha='%s' "%(channel)
query += "and cha='%s' " % (channel)

row = self.conn.execute(query).fetchall()[0]

min = MAX_DATE
max = MIN_DATE

if(len(row)):
if(row[0] is not None): min = UTCDateTime(row[0])
if(row[1] is not None): max = UTCDateTime(row[1])
if (len(row)):
if (row[0] is not None): min = UTCDateTime(row[0])
if (row[1] is not None): max = UTCDateTime(row[1])
# end if

return min, max
# end func

def get_nslc_list(self):
query = "select net, sta, loc, cha from nslc"
rows = self.conn.execute(query).fetchall()

return rows
# end if

def get_stations(self, starttime, endtime, network=None, station=None, location=None, channel=None):
starttime = UTCDateTime(starttime).timestamp
endtime = UTCDateTime(endtime).timestamp

query = 'select * from wtag where '
query = 'select ds_id, net, sta, loc, cha from wtag where '
if (network is not None): query += " net='%s' "%(network)
if (station is not None):
if(network is not None): query += "and sta='%s' "%(station)
Expand All @@ -515,7 +526,7 @@ def get_stations(self, starttime, endtime, network=None, station=None, location=
rows = self.conn.execute(query).fetchall()
results = set()
for row in rows:
ds_id, net, sta, loc, cha, st, et, tag = row
ds_id, net, sta, loc, cha = row

# [net, sta, loc, cha, lon, lat, elev_m]
rv = (net, sta, loc, cha, *self.asdf_station_coordinates[ds_id]['%s.%s' % (net, sta)])
Expand Down

0 comments on commit cfdcd7d

Please sign in to comment.