-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathML_experiment.py
367 lines (293 loc) · 14.1 KB
/
ML_experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# %%
# importing libraries
import pandas as pd
import numpy as np
import json
import seaborn as sns
import matplotlib.pyplot as plt
from datasets import load_dataset
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import confusion_matrix
from nltk.tokenize import sent_tokenize
# %%
# load the Memo-corpus dataset w. metadata
ds = load_dataset("chcaa/memo-canonical-novels")
# make df
meta = pd.DataFrame(ds['train'])
meta.head()
meta.columns
# %%
# we want to use only 3 categories (O, HISTORICAL, CANON)
# define the nice labels for the categories
nice_labels = {'O': 'Other', 'HISTORICAL': 'Historical', 'CANON': 'Canon'}
# Combine categories in the 'CATEGORY' column
meta['CATEGORY'] = meta['CATEGORY'].replace({
'LEX_CANON': 'CANON',
'CE_CANON': 'CANON',
'CANON_HISTORICAL': 'CANON', # canon books that are also historical will be considered canon
})
# make sure we only have 3 categories
if len(meta['CATEGORY'].unique()) == 3:
print('--- using only 3 categories ---')
print('Unique categories:', meta['CATEGORY'].unique())
# %%
# Load the embeddings data (previous work)
# for embedding extraction, see: https://github.com/centre-for-humanities-computing/memo-canonical-novels
with open('data/meanpool__intfloat__multilingual-e5-large-instruct_identify_author.json', 'r') as f:
embeddings_data = [json.loads(line) for line in f]
embeddings_df = pd.DataFrame(embeddings_data)
embeddings_df.head()
# %%
# make sure that the embeddings are in the right format
embeddings_df['embedding'] = embeddings_df['embedding'].apply(np.array)
# Merge embeddings with the main dataframe
merged_df = pd.merge(meta, embeddings_df, left_on='FILENAME', right_on='filename')
# add sentence length as a baseline feature for the model
merged_df['avg_sentence_length'] = merged_df['TEXT'].apply(lambda x: np.mean([len(sent_tokenize(s)) for s in sent_tokenize(x)]))
# %%
# define the column used for the class labels
class_column = 'CATEGORY'
print(merged_df[class_column].value_counts())
# define the testset size and the number of iterations
test_size = 0.1
num_iterations = 50
print('test size:', test_size)
print('num iterations:', num_iterations)
# OneHotEncoder for the 'publisher' feature
publisher_encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
# Define feature combinations
feature_combinations = {
'avg_sentence_length': lambda df: df['avg_sentence_length'].values.reshape(-1, 1),
'embeddings': lambda df: np.stack(df['embedding'].values),
'price': lambda df: df['PRICE'].values.reshape(-1, 1),
'publisher': lambda df: publisher_encoder.fit_transform(df['PUBLISHER'].values.reshape(-1, 1)),
'embeddings_price': lambda df: np.hstack([np.stack(df['embedding'].values), df['PRICE'].values.reshape(-1, 1)]),
'embeddings_publisher': lambda df: np.hstack([np.stack(df['embedding'].values),
publisher_encoder.fit_transform(df['PUBLISHER'].values.reshape(-1, 1))]),
'publisher_price': lambda df: np.hstack([publisher_encoder.fit_transform(df['PUBLISHER'].values.reshape(-1, 1)),
df['PRICE'].values.reshape(-1, 1)]),
'embeddings_publisher_price': lambda df: np.hstack([np.stack(df['embedding'].values),
publisher_encoder.fit_transform(df['PUBLISHER'].values.reshape(-1, 1)),
df['PRICE'].values.reshape(-1, 1)])
}
print('number of feature combinations:', len(feature_combinations), ':', [x for x in feature_combinations.keys()])
# %%
# First ML run w 3 classes
# Dictionary to store class-wise metrics for all feature combinations
results = {feature_set: {} for feature_set in feature_combinations}
# dictionary to store results for the confusion matrix
confusion_matrix_results = {feature_set: None for feature_set in feature_combinations}
for feature_set_name, feature_set_func in feature_combinations.items():
print(f"Evaluating feature set: {feature_set_name}")
# Initialize storage for class-wise metrics
class_performance = {}
for i in range(num_iterations):
# Step 1: Find the minimum class size
min_class_size = merged_df[class_column].value_counts().min()
# Step 2: Down-sample each class
balanced_dfs = [
group.sample(n=min_class_size, random_state=i) # Sample from each group
for _, group in merged_df.groupby(class_column)
]
balanced_df = pd.concat(balanced_dfs, ignore_index=True)
# # Step 3: Shuffle the dataset
balanced_df = balanced_df.sample(frac=1, random_state=i).reset_index(drop=True)
# Step 4: Create feature matrix and target array
X = feature_set_func(balanced_df)
y = balanced_df[class_column].values
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=i)
# Train Random Forest Classifier
clf = RandomForestClassifier(n_estimators=100, random_state=i, class_weight='balanced')
clf.fit(X_train, y_train)
# Evaluate the model
y_pred = clf.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True, zero_division=0) # Get report as a dictionary
# store results for the confusion matrix
# Compute the confusion matrix for this iteration
cm = confusion_matrix(y_test, y_pred)
# Accumulate confusion matrices
if confusion_matrix_results[feature_set_name] is None:
confusion_matrix_results[feature_set_name] = cm # Initialize with the first matrix
else:
confusion_matrix_results[feature_set_name] += cm # Add subsequent matrices
# Store class-wise scores
for class_name, metrics in report.items():
if class_name in ['accuracy', 'macro avg', 'weighted avg']:
continue # Skip non-class entries
if class_name not in class_performance:
class_performance[class_name] = {'precision': [], 'recall': [], 'f1-score': []}
class_performance[class_name]['precision'].append(metrics['precision'])
class_performance[class_name]['recall'].append(metrics['recall'])
class_performance[class_name]['f1-score'].append(metrics['f1-score'])
# Calculate mean performance for each class and store results
results[feature_set_name] = {
class_name: {
'mean_precision': np.mean(scores['precision']),
'mean_recall': np.mean(scores['recall']),
'mean_f1': np.mean(scores['f1-score']),
'std_f1': np.std(scores['f1-score']),
}
for class_name, scores in class_performance.items()
}
# Average the confusion matrix across all iterations
confusion_matrix_results[feature_set_name] = (
confusion_matrix_results[feature_set_name] / num_iterations
)
# %%
# Display results
for feature_set_name, class_metrics in results.items():
print(f"Feature Set: {feature_set_name}")
for class_name, metrics in class_metrics.items():
print(f" Class {class_name}:")
print(f" Mean Precision: {metrics['mean_precision']:.3f}")
print(f" Mean Recall: {metrics['mean_recall']:.3f}")
print(f" Mean F1-Score: {metrics['mean_f1']:.3f}")
# and get the SD of the F1 score
print(' ..')
print(f" STD F1-Score: {metrics['std_f1']:.3f}")
print()
# save them to a txt in results folder
with open('results/ML_3classes_results.txt', 'w') as f:
for feature_set_name, class_metrics in results.items():
f.write(f"Feature Set: {feature_set_name}\n")
for class_name, metrics in class_metrics.items():
f.write(f" Class {class_name}:\n")
f.write(f" Mean Precision: {metrics['mean_precision']:.3f}\n")
f.write(f" Mean Recall: {metrics['mean_recall']:.3f}\n")
f.write(f" Mean F1-Score: {metrics['mean_f1']:.3f}\n")
f.write(' ..\n')
f.write(f" STD F1-Score: {metrics['std_f1']:.3f}\n")
f.write('\n')
# print the confusion matrix for the full feature set
print('Confusion Matrix for the full feature set:')
class_labels = sorted(merged_df['CATEGORY'].unique()) # Ensure labels match matrix order
plt.figure(figsize=(10, 8))
sns.heatmap(
confusion_matrix_results['embeddings_publisher_price'],
annot=True,
cmap='Blues',
xticklabels=class_labels,
yticklabels=class_labels
)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.savefig('figs/ML_3classes_confusion_matrix.png')
plt.show()
# %%
# OK
# %%
# again, ML run in which we try to distinguish only two classes
# keep the histoprical in, but relabel them as 'O'
df_two_classes = merged_df.copy()
df_two_classes['CATEGORY'] = df_two_classes['CATEGORY'].replace({'HISTORICAL': 'O'})
print(df_two_classes[class_column].value_counts())
# %%
# and second ML run over 2 classes
# Dictionary to store class-wise metrics for all feature combinations
results = {feature_set: {} for feature_set in feature_combinations}
# dictionary to store results for the confusion matrix
confusion_matrix_results = {feature_set: None for feature_set in feature_combinations}
# OneHotEncoder for the 'publisher' feature
publisher_encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
print('full class sizes', df_two_classes[class_column].value_counts())
print('sample size:', df_two_classes[class_column].value_counts().min())
for feature_set_name, feature_set_func in feature_combinations.items():
print(f"Evaluating feature set: {feature_set_name}")
# Initialize storage for class-wise metrics
class_performance = {}
for i in range(num_iterations):
# Step 1: Find the minimum class size
min_class_size = df_two_classes[class_column].value_counts().min()
# Step 2: Down-sample each class
balanced_dfs = [
group.sample(n=min_class_size, random_state=i) # Sample from each group
for _, group in df_two_classes.groupby(class_column)
]
balanced_df = pd.concat(balanced_dfs, ignore_index=True)
# # Step 3: Shuffle the dataset
balanced_df = balanced_df.sample(frac=1, random_state=i).reset_index(drop=True)
# Step 4: Create feature matrix and target array
X = feature_set_func(balanced_df)
y = balanced_df[class_column].values
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=i)
# Train Random Forest Classifier
clf = RandomForestClassifier(n_estimators=100, random_state=i)
clf.fit(X_train, y_train)
# Evaluate the model
y_pred = clf.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True, zero_division=0) # Get report as a dictionary
# Generate confusion matrix
cm = confusion_matrix(y_test, y_pred)
# Accumulate confusion matrices
if confusion_matrix_results[feature_set_name] is None:
confusion_matrix_results[feature_set_name] = cm
else:
confusion_matrix_results[feature_set_name] += cm
# Store class-wise scores
for class_name, metrics in report.items():
if class_name in ['accuracy', 'macro avg', 'weighted avg']:
continue # Skip non-class entries
if class_name not in class_performance:
class_performance[class_name] = {'precision': [], 'recall': [], 'f1-score': []}
class_performance[class_name]['precision'].append(metrics['precision'])
class_performance[class_name]['recall'].append(metrics['recall'])
class_performance[class_name]['f1-score'].append(metrics['f1-score'])
# Calculate mean performance for each class and store results
results[feature_set_name] = {
class_name: {
'mean_precision': np.mean(scores['precision']),
'mean_recall': np.mean(scores['recall']),
'mean_f1': np.mean(scores['f1-score']),
'std_f1': np.std(scores['f1-score']),
}
for class_name, scores in class_performance.items()
}
# Average the confusion matrix across all iterations
confusion_matrix_results[feature_set_name] = (
confusion_matrix_results[feature_set_name] / num_iterations
)
# %%
# Display results
for feature_set_name, class_metrics in results.items():
print(f"Feature Set: {feature_set_name}")
for class_name, metrics in class_metrics.items():
print(f" Class {class_name}:")
print(f" Mean Precision: {metrics['mean_precision']:.3f}")
print(f" Mean Recall: {metrics['mean_recall']:.3f}")
print(f" Mean F1-Score: {metrics['mean_f1']:.3f}")
# and get the SD of the F1 score
print(' ..')
print(f" STD F1-Score: {metrics['std_f1']:.3f}")
print()
# save them to a txt in results folder
with open('results/ML_2classes_results.txt', 'w') as f:
for feature_set_name, class_metrics in results.items():
f.write(f"Feature Set: {feature_set_name}\n")
for class_name, metrics in class_metrics.items():
f.write(f" Class {class_name}:\n")
f.write(f" Mean Precision: {metrics['mean_precision']:.3f}\n")
f.write(f" Mean Recall: {metrics['mean_recall']:.3f}\n")
f.write(f" Mean F1-Score: {metrics['mean_f1']:.3f}\n")
f.write(' ..\n')
f.write(f" STD F1-Score: {metrics['std_f1']:.3f}\n")
f.write('\n')
# print the confusion matrix for the full feature set
print('Confusion Matrix for the full feature set:')
# use the two classes
class_labels = sorted(df_two_classes['CATEGORY'].unique()) # Ensure labels match matrix order
print(class_labels)
plt.figure(figsize=(10, 8))
sns.heatmap(
confusion_matrix_results['embeddings_publisher_price'],
annot=True,
cmap='Blues',
xticklabels=class_labels,
yticklabels=class_labels
)
plt.savefig('figs/ML_2classes_confusion_matrix.png')
# %%