Skip to content

Commit

Permalink
more aggressive filtering; revert to chunked output
Browse files Browse the repository at this point in the history
  • Loading branch information
dionhaefner committed Nov 4, 2020
1 parent a2aa08a commit 05d7a38
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
1 change: 0 additions & 1 deletion fowd/cdip.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,4 @@ def generate_results():
write_records(
result_generator, out_file, station_name,
include_direction=True, extra_metadata=EXTRA_METADATA,
num_records=num_waves_total
)
1 change: 0 additions & 1 deletion fowd/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ def postprocess_cdip(cdip_files, out_folder):
write_records(
record_generator,
outfile, station_name,
num_records=num_records,
extra_metadata=out_metadata,
include_direction=True
)
Expand Down
2 changes: 1 addition & 1 deletion fowd/generic_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,4 @@ def process_file(input_file, out_folder, station_id=None):
result_generator = filter(None, read_pickle_outfile_chunks(result_file))
out_file = os.path.join(out_folder, f'fowd_{station_id}.nc')
logger.info('Writing output to %s', out_file)
write_records(result_generator, out_file, station_id, num_records=num_waves)
write_records(result_generator, out_file, station_id)
38 changes: 28 additions & 10 deletions fowd/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
FILL_VALUE_NUMBER = -9999
FILL_VALUE_STR = 'MISSING'

# chunk sizes to use for each dimension
CHUNKSIZES = {
'meta_station_name': 1,
'wave_id_local': 10_000,
'meta_frequency_band': len(FREQUENCY_INTERVALS),
}

DATASET_VARIABLES = dict(
# metadata
meta_source_file_name=dict(
Expand Down Expand Up @@ -487,21 +494,16 @@ def get_dataset_metadata(station_name, start_time, end_time, extra_metadata=None


def write_records(wave_record_iterator, filename, station_name, extra_metadata=None,
include_direction=False, num_records=None):
include_direction=False):
"""Write given wave records in FOWD's netCDF4 output format.
First argument is an iterable of chunks of wave records.
"""

if num_records is None:
wave_id_dim = None
else:
wave_id_dim = np.arange(num_records)

dimension_data = (
# (name, dtype, data)
('meta_station_name', str, np.array([np.string_(station_name)])),
('wave_id_local', 'int64', wave_id_dim),
('wave_id_local', 'int64', None),
('meta_frequency_band', 'uint8', np.arange(len(FREQUENCY_INTERVALS))),
)

Expand All @@ -521,7 +523,13 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N
else:
f.createDimension(dim, len(val))

v = f.createVariable(dim, dtype, (dim,))
extra_args = dict(
zlib=True,
fletcher32=True,
chunksizes=[CHUNKSIZES[dim]]
)

v = f.createVariable(dim, dtype, (dim,), **extra_args)

if val is not None:
v[:] = val
Expand All @@ -530,14 +538,19 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N
# add meta_station_name as additional scalar dimension
dims = ('meta_station_name',) + meta['dims']

extra_args = dict(
zlib=True,
fletcher32=True,
chunksizes=[CHUNKSIZES[dim] for dim in dims]
)

# determine dtype
if meta['dtype'] == 'vlen':
dtype = vlen_type
else:
dtype = meta['dtype']

# add correct fill value
extra_args = {}
is_number = np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.integer)
if is_number and dtype is not vlen_type:
extra_args.update(fill_value=FILL_VALUE_NUMBER)
Expand Down Expand Up @@ -597,7 +610,12 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N

# add extra variables
for name, meta in EXTRA_VARIABLES.items():
v = f.createVariable(name, meta['data'].dtype, meta['dims'])
extra_args = dict(
zlib=True,
fletcher32=True,
chunksizes=[CHUNKSIZES[dim] for dim in meta['dims']]
)
v = f.createVariable(name, meta['data'].dtype, meta['dims'], **extra_args)
v[:] = meta['data']
for attr, val in meta['attrs'].items():
setattr(v, attr, val)
Expand Down
6 changes: 3 additions & 3 deletions fowd/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ def remove_blacklisted(ds):


def filter_low_swh(ds):
"""Remove all records with very low significant wave heights."""
return ds['sea_state_30m_significant_wave_height_spectral'] > 0.5
"""Remove all records with low significant wave heights."""
return ds['sea_state_30m_significant_wave_height_spectral'] > 1.0


def filter_undersampled(ds):
"""Remove all records that are undersampled."""
nyquist_frequency = 0.5 * ds['meta_sampling_rate']
mean_frequency = 1. / (ds['sea_state_30m_mean_period_spectral'] / np.timedelta64(1, 's'))
return 2.2 * mean_frequency < nyquist_frequency
return 3.2 * mean_frequency < nyquist_frequency


def filter_cdip(ds, num_filtered_dict=None, chunk_size=10_000):
Expand Down

0 comments on commit 05d7a38

Please sign in to comment.