From 05d7a38f18461389701c5aaa8e4b2ce19fd4e94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 4 Nov 2020 10:50:39 +0100 Subject: [PATCH] more aggressive filtering; revert to chunked output --- fowd/cdip.py | 1 - fowd/cli.py | 1 - fowd/generic_source.py | 2 +- fowd/output.py | 38 ++++++++++++++++++++++++++++---------- fowd/postprocessing.py | 6 +++--- 5 files changed, 32 insertions(+), 16 deletions(-) diff --git a/fowd/cdip.py b/fowd/cdip.py index bbcc7a2..ec46096 100644 --- a/fowd/cdip.py +++ b/fowd/cdip.py @@ -294,5 +294,4 @@ def generate_results(): write_records( result_generator, out_file, station_name, include_direction=True, extra_metadata=EXTRA_METADATA, - num_records=num_waves_total ) diff --git a/fowd/cli.py b/fowd/cli.py index 38e5afe..5c5e87c 100644 --- a/fowd/cli.py +++ b/fowd/cli.py @@ -198,7 +198,6 @@ def postprocess_cdip(cdip_files, out_folder): write_records( record_generator, outfile, station_name, - num_records=num_records, extra_metadata=out_metadata, include_direction=True ) diff --git a/fowd/generic_source.py b/fowd/generic_source.py index 3d952d5..cd6242e 100644 --- a/fowd/generic_source.py +++ b/fowd/generic_source.py @@ -157,4 +157,4 @@ def process_file(input_file, out_folder, station_id=None): result_generator = filter(None, read_pickle_outfile_chunks(result_file)) out_file = os.path.join(out_folder, f'fowd_{station_id}.nc') logger.info('Writing output to %s', out_file) - write_records(result_generator, out_file, station_id, num_records=num_waves) + write_records(result_generator, out_file, station_id) diff --git a/fowd/output.py b/fowd/output.py index 2fccfc7..dfd1158 100644 --- a/fowd/output.py +++ b/fowd/output.py @@ -21,6 +21,13 @@ FILL_VALUE_NUMBER = -9999 FILL_VALUE_STR = 'MISSING' +# chunk sizes to use for each dimension +CHUNKSIZES = { + 'meta_station_name': 1, + 'wave_id_local': 10_000, + 'meta_frequency_band': len(FREQUENCY_INTERVALS), +} + DATASET_VARIABLES = dict( # metadata meta_source_file_name=dict( @@ -487,21 +494,16 @@ def get_dataset_metadata(station_name, start_time, end_time, extra_metadata=None def write_records(wave_record_iterator, filename, station_name, extra_metadata=None, - include_direction=False, num_records=None): + include_direction=False): """Write given wave records in FOWD's netCDF4 output format. First argument is an iterable of chunks of wave records. """ - if num_records is None: - wave_id_dim = None - else: - wave_id_dim = np.arange(num_records) - dimension_data = ( # (name, dtype, data) ('meta_station_name', str, np.array([np.string_(station_name)])), - ('wave_id_local', 'int64', wave_id_dim), + ('wave_id_local', 'int64', None), ('meta_frequency_band', 'uint8', np.arange(len(FREQUENCY_INTERVALS))), ) @@ -521,7 +523,13 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N else: f.createDimension(dim, len(val)) - v = f.createVariable(dim, dtype, (dim,)) + extra_args = dict( + zlib=True, + fletcher32=True, + chunksizes=[CHUNKSIZES[dim]] + ) + + v = f.createVariable(dim, dtype, (dim,), **extra_args) if val is not None: v[:] = val @@ -530,6 +538,12 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N # add meta_station_name as additional scalar dimension dims = ('meta_station_name',) + meta['dims'] + extra_args = dict( + zlib=True, + fletcher32=True, + chunksizes=[CHUNKSIZES[dim] for dim in dims] + ) + # determine dtype if meta['dtype'] == 'vlen': dtype = vlen_type @@ -537,7 +551,6 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N dtype = meta['dtype'] # add correct fill value - extra_args = {} is_number = np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.integer) if is_number and dtype is not vlen_type: extra_args.update(fill_value=FILL_VALUE_NUMBER) @@ -597,7 +610,12 @@ def write_records(wave_record_iterator, filename, station_name, extra_metadata=N # add extra variables for name, meta in EXTRA_VARIABLES.items(): - v = f.createVariable(name, meta['data'].dtype, meta['dims']) + extra_args = dict( + zlib=True, + fletcher32=True, + chunksizes=[CHUNKSIZES[dim] for dim in meta['dims']] + ) + v = f.createVariable(name, meta['data'].dtype, meta['dims'], **extra_args) v[:] = meta['data'] for attr, val in meta['attrs'].items(): setattr(v, attr, val) diff --git a/fowd/postprocessing.py b/fowd/postprocessing.py index 34466cd..84224ad 100644 --- a/fowd/postprocessing.py +++ b/fowd/postprocessing.py @@ -136,15 +136,15 @@ def remove_blacklisted(ds): def filter_low_swh(ds): - """Remove all records with very low significant wave heights.""" - return ds['sea_state_30m_significant_wave_height_spectral'] > 0.5 + """Remove all records with low significant wave heights.""" + return ds['sea_state_30m_significant_wave_height_spectral'] > 1.0 def filter_undersampled(ds): """Remove all records that are undersampled.""" nyquist_frequency = 0.5 * ds['meta_sampling_rate'] mean_frequency = 1. / (ds['sea_state_30m_mean_period_spectral'] / np.timedelta64(1, 's')) - return 2.2 * mean_frequency < nyquist_frequency + return 3.2 * mean_frequency < nyquist_frequency def filter_cdip(ds, num_filtered_dict=None, chunk_size=10_000):