Skip to content

Commit

Permalink
Correct stats creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
gnrgomes committed Jun 14, 2024
1 parent 4bcf361 commit 105cdcc
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 41 deletions.
72 changes: 40 additions & 32 deletions src/lisfloodutilities/gridding/lib/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code:
self.setup_column_names()
self.OUTPUT_COLUMNS = [self.COL_LON, self.COL_LAT, self.COL_VALUE]
self.INTERNAL_COLUMNS = [f'{self.COL_STATION_DIARY_STATUS}_INTERNAL', f'{self.COL_INACTIVE_HISTORIC}_INTERNAL']
self.print_stats = False

def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_data_frames: List[pd.DataFrame]) -> List[pd.DataFrame]:
"""
Expand All @@ -39,8 +40,10 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat
for file_path in kiwis_files:
if len(kiwis_data_frames) > 0:
df_kiwis = kiwis_data_frames[i]
self.print_stats = False
else:
df_kiwis = pd.read_csv(file_path, sep="\t")
self.print_stats = True
self.cur_timestamp = dt.strptime(f'{kiwis_timestamps[i]}', "%Y%m%d%H%M")
df_kiwis = self.apply_filter(df_kiwis)
filtered_data_frames.append(df_kiwis)
Expand All @@ -64,7 +67,7 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
(df[f'{self.COL_NO_GRIDDING}'] == 'no') & (df[f'{self.COL_IS_IN_DOMAIN}'] == 'yes') &
(df[f'{self.COL_EXCLUDE}'] != 'yes') & (df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] == 1) &
(df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] == 1)]
self.print_statistics(df)
self.print_statistics(df, force_print=self.print_stats)
# Show only codes valid and suspicious
df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_VALID) |
(df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_SUSPICIOUS))]
Expand All @@ -77,37 +80,39 @@ def get_totals_by_quality_code(row: pd.Series, column_quality_code: str, quality
return row['count']
return 0

def print_statistics(self, df: pd.DataFrame):
timestamp = self.cur_timestamp.strftime('%Y-%m-%d %H:%M:%S')
new_df = df.groupby([self.COL_PROVIDER_ID, self.COL_QUALITY_CODE]).size().reset_index(name='count')
# Transpose the quality codes
new_df[self.QUALITY_CODE_VALID] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_VALID)
new_df[self.QUALITY_CODE_SUSPICIOUS] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_SUSPICIOUS)
new_df[self.QUALITY_CODE_WRONG] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_WRONG)
new_df.drop(columns=[self.COL_QUALITY_CODE, 'count'], inplace=True)
new_df = new_df.groupby(self.COL_PROVIDER_ID)[[self.QUALITY_CODE_VALID,
self.QUALITY_CODE_SUSPICIOUS,
self.QUALITY_CODE_WRONG]].sum()
new_df.reset_index(inplace=True)
for index, row in new_df.iterrows():
provider_id = row[self.COL_PROVIDER_ID]
quality_code_valid = row[self.QUALITY_CODE_VALID]
quality_code_suspicious = row[self.QUALITY_CODE_SUSPICIOUS]
quality_code_wrong = row[self.QUALITY_CODE_WRONG]
total = quality_code_valid + quality_code_suspicious + quality_code_wrong
stats_string = (
f'#KIWIS_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{self.var_code}", '
f'"PROVIDER_ID": {provider_id}, "QUALITY_CODE_VALID": {quality_code_valid}, '
f'"QUALITY_CODE_SUSPICIOUS": {quality_code_suspicious}, "QUALITY_CODE_WRONG": {quality_code_wrong}, '
f'"TOTAL_OBSERVATIONS": {total}}}'
)
self.print_msg(stats_string)
def print_statistics(self, df: pd.DataFrame, stats_tag: str = 'KIWIS_STATS', force_print: bool = True):
# print only once when reading a file for the first time
if force_print:
timestamp = self.cur_timestamp.strftime('%Y-%m-%d %H:%M:%S')
new_df = df.groupby([self.COL_PROVIDER_ID, self.COL_QUALITY_CODE]).size().reset_index(name='count')
# Transpose the quality codes
new_df[self.QUALITY_CODE_VALID] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_VALID)
new_df[self.QUALITY_CODE_SUSPICIOUS] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_SUSPICIOUS)
new_df[self.QUALITY_CODE_WRONG] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_WRONG)
new_df.drop(columns=[self.COL_QUALITY_CODE, 'count'], inplace=True)
new_df = new_df.groupby(self.COL_PROVIDER_ID)[[self.QUALITY_CODE_VALID,
self.QUALITY_CODE_SUSPICIOUS,
self.QUALITY_CODE_WRONG]].sum()
new_df.reset_index(inplace=True)
for index, row in new_df.iterrows():
provider_id = row[self.COL_PROVIDER_ID]
quality_code_valid = row[self.QUALITY_CODE_VALID]
quality_code_suspicious = row[self.QUALITY_CODE_SUSPICIOUS]
quality_code_wrong = row[self.QUALITY_CODE_WRONG]
total = quality_code_valid + quality_code_suspicious + quality_code_wrong
stats_string = (
f'#KIWIS_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{self.var_code}", '
f'"PROVIDER_ID": {provider_id}, "QUALITY_CODE_VALID": {quality_code_valid}, '
f'"QUALITY_CODE_SUSPICIOUS": {quality_code_suspicious}, "QUALITY_CODE_WRONG": {quality_code_wrong}, '
f'"TOTAL_OBSERVATIONS": {total}}}'
)
self.print_msg(stats_string)

def print_msg(self, msg: str = ''):
if not self.quiet_mode:
Expand Down Expand Up @@ -217,6 +222,7 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
df['has_neighbor_within_radius'] = df.apply(self.has_neighbor_within_radius_from_other_providers,
axis=1, tree=tree, provider_id=provider_id, radius=radius)
df = df.loc[~(df['has_neighbor_within_radius'])]
self.print_statistics(df)
return df

def has_neighbor_within_radius_from_other_providers(self, row: pd.Series, tree: cKDTree = None, provider_id: int = 0,
Expand Down Expand Up @@ -270,6 +276,7 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
axis=1, tree=tree, provider_id=provider_id, radius=radius)
self.set_filtered_stations(df)
df = df.loc[~(df['has_neighbor_within_radius'])]
self.print_statistics(df)
return df

def set_filtered_stations(self, df: pd.DataFrame):
Expand Down Expand Up @@ -455,5 +462,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat
# Append both decumulated 24h dataframes to the 6h slot
df_filtered = pd.concat([df, self.df_24h_without_neighbors, df_decumulated_24h])
return_data_frames.append(df_filtered)
self.print_statistics(df_filtered)
return return_data_frames

17 changes: 8 additions & 9 deletions src/lisfloodutilities/gridding/tools/analyse_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def run(infolder: str, outfolder: str):
outfilepath = Path(outfile)
# Create the output parent folders if not exist yet
Path(outfilepath.parent).mkdir(parents=True, exist_ok=True)
df = pd.read_csv(filename, delimiter=DELIMITER_INPUT)
df = df.astype({COL_INCIDENTS: 'str'})
df = pd.read_csv(filename, delimiter=DELIMITER_INPUT, low_memory=False)
df = df.astype({COL_PROVIDER_ID: 'str', COL_INCIDENTS: 'str'})
incident_type_columns = {}
df[COL_TOTAL_INCIDENTS] = df.apply(get_total_incidents, axis=1)

Expand Down Expand Up @@ -166,8 +166,7 @@ def main(argv):
See the Licence for the specific language governing permissions and limitations under the Licence.
"""

# try:
if True:
try:
# setup option parser
parser = ArgumentParser(epilog=program_license, description=program_version_string+program_longdesc)

Expand All @@ -189,11 +188,11 @@ def main(argv):

run(args.infolder, args.outfolder)
print("Finished.")
# except Exception as e:
# indent = len(program_name) * " "
# sys.stderr.write(program_name + ": " + repr(e) + "\n")
# sys.stderr.write(indent + " for help use --help")
# return 2
except Exception as e:
indent = len(program_name) * " "
sys.stderr.write(program_name + ": " + repr(e) + "\n")
sys.stderr.write(indent + " for help use --help")
return 2


def main_script():
Expand Down
23 changes: 23 additions & 0 deletions src/lisfloodutilities/gridding/tools/read_stats_from_logs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dask.dataframe.io.tests.test_json import df
from pandas.tests.io.test_fsspec import df1
__author__="Goncalo Gomes"
__date__="$Jun 06, 2024 10:45:00$"
__version__="0.1"
Expand Down Expand Up @@ -28,6 +29,27 @@
def have_the_same_columns(df1: pd.DataFrame, df2: pd.DataFrame):
return all(df1.columns.isin(df2.columns))

def merge_kiwis_stats(df: pd.DataFrame, search_string: str = ''):
'''
KIWIS Stats will contain several rows from different KiwisFilters each one filtering more and more observations.
That is why we need to get the minimum out of the VALID and SUSPICIOUS observations, but for the WRONG ones they
are filtered in the first interaction with the KiwisFilter class.
'''
KWIWS_SEARCH_STRING = '#KIWIS_STATS: '
result_df = df
if search_string == KWIWS_SEARCH_STRING:
group_cols = ['TIMESTAMP', 'VAR_CODE', 'PROVIDER_ID']
agg_dict = {'QUALITY_CODE_VALID': 'min', 'QUALITY_CODE_SUSPICIOUS': 'min',
'QUALITY_CODE_WRONG': 'max', 'TOTAL_OBSERVATIONS': 'max'}
result_df = result_df.groupby(group_cols).agg(agg_dict).reset_index()
result_df.columns = ['TIMESTAMP', 'VAR_CODE', 'PROVIDER_ID', 'QUALITY_CODE_VALID',
'QUALITY_CODE_SUSPICIOUS', 'QUALITY_CODE_WRONG', 'TOTAL_OBSERVATIONS']
result_df.reset_index(drop=True, inplace=True)
# Recalculate the total
result_df['TOTAL_OBSERVATIONS'] = result_df['QUALITY_CODE_VALID'] + result_df['QUALITY_CODE_SUSPICIOUS'] + result_df['QUALITY_CODE_WRONG']
return result_df


def run(infolder: str, outfile: str, search_string: str, inwildcard: str = '*.log'):
out_df = None
outfilepath = Path(outfile)
Expand All @@ -53,6 +75,7 @@ def run(infolder: str, outfile: str, search_string: str, inwildcard: str = '*.lo
print('WARNING: No lines containing statistics where found.')
else:
out_df = out_df.drop_duplicates()
out_df = merge_kiwis_stats(out_df, search_string)
out_df.to_csv(outfilepath, index=False, header=True, sep='\t')
print(f'Wrote file: {outfilepath}')
print(out_df)
Expand Down

0 comments on commit 105cdcc

Please sign in to comment.