-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadapter.py
217 lines (180 loc) · 8.81 KB
/
adapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import torch.nn.functional as F
from adapter_dataset import FeatureDataset
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
import json
from tqdm import trange, tqdm
# reference: https://github.com/gaopengcuhk/CLIP-Adapter/blob/main/clip_adapter.py#L55C1-L67C17
class ClipAdapter(nn.Module):
"""
Original clip adapter.
"""
def __init__(self, c_in, reduction=4):
super(ClipAdapter, self).__init__()
self.fc = nn.Sequential(
nn.Linear(c_in, c_in // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(c_in // reduction, c_in, bias=False),
nn.ReLU(inplace=True)
)
def forward(self, x):
x = self.fc(x)
return x
class ModifiedClipAdapter(nn.Module):
"""
Modified version of the CLIP adapter for better performance.
Add Dropout layer.
"""
def __init__(self, c_in, reduction=4, ratio=0.6):
super(ModifiedClipAdapter, self).__init__()
self.fc = nn.Sequential(
nn.Linear(c_in, c_in // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(c_in // reduction, c_in, bias=False),
nn.ReLU(inplace=True)
)
self.ratio = ratio
def forward(self, inputs):
inputs = F.normalize(inputs, dim=-1, p=2)
x = self.fc(inputs)
x = self.ratio * x + (1 - self.ratio) * inputs
return x
class WeightAdapter(nn.Module):
"""
Predict weights for each feature vector.
"""
def __init__(self, c_in, reduction=4, scalar=10.0):
"""
@param c_in: The channel size of the input feature vector
@param reduction: the reduction factor for the hidden layer
@param scalar: A scalar to scale the input feature vector
"""
super(WeightAdapter, self).__init__()
self.fc = nn.Sequential(
nn.Linear(c_in, c_in // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(c_in // reduction, c_in, bias=False),
nn.ReLU(inplace=True)
)
# self.ratio = ratio
self.scalar = scalar
def forward(self, inputs):
# inputs = F.normalize(inputs, dim=-1, p=2)
inputs = self.scalar * inputs
x = self.fc(inputs)
x = x.sigmoid()
x = x * inputs
return x
# modified from SimCLR loss: https://github.com/sthalles/SimCLR/blob/master/simclr.py#L26
class InfoNCELoss(nn.Module):
def __init__(self, temperature=0.5, eps=1e-8):
super(InfoNCELoss, self).__init__()
self.temperature = temperature
self.eps = eps # Small constant to avoid division by zero or log(0)
def forward(self, features, labels):
# original_labels = labels
features = F.normalize(features, dim=1)
similarity_matrix = torch.matmul(features, features.T)
labels = labels.contiguous().view(-1, 1)
mask_positive = torch.eq(labels, labels.T).float()
mask_negative = 1 - mask_positive
mask_positive.fill_diagonal_(0)
exp_logits = torch.exp(similarity_matrix / self.temperature)
sum_negatives = torch.sum(exp_logits * mask_negative, dim=1, keepdim=True) + self.eps # replace 1-mask_positive with mask_negative
sum_positives = torch.sum(exp_logits * mask_positive, dim=1, keepdim=True)
# Adding eps inside the log to avoid log(0)
loss = -torch.log(sum_positives / (sum_positives + sum_negatives) + self.eps)
loss = loss.mean()
return loss
if __name__ == '__main__':
adapter_type = 'weight'
# dataset_name = 'lmo_weight_10sigmoid'
# dataset_folder = 'lmo'
dataset_name = f'insDet_{adapter_type}_0523'
# dataset_name = f'lmo_{adapter_type}'
temperature = 0.05
ratio = 0.6
feature_dataset = FeatureDataset(data_json='./obj_FFA/object_features_vitl14_reg.json', num_object=100) # 100 objects in total
# feature_dataset = FeatureDataset(data_json='./obj_FFA/object_features.json',
# num_object=100) # 100 objects in total
# Assuming 'features' is your (N, 1024) tensor
batch_size = 1024
# robo_feature_dataset = FeatureDataset(data_json='./RoboTools_obj_feat/object_features.json', num_object=20) # 20 objects in total
# ycbv_feature_dataset = FeatureDataset(data_json='./BOP_obj_feat/ycbv_object_features.json', num_object=21) # 21 objects in total
# lmo_feature_dataset = FeatureDataset(data_json='./BOP_obj_feat/lmo_object_features.json', num_object=8)
### bop challenge datasets
# lmo_bop23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/lmo/descriptors_pbr.pth', num_object=8)
# tless_bop23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/tless/descriptors_pbr.pth', num_object=30, label_offset=8)
# tudl_bop23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/tudl/descriptors_pbr.pth', num_object=3, label_offset=38)
# icbin_bop23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/icbin/descriptors_pbr.pth', num_object=2, label_offset=41)
# itodd_bop23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/itodd/descriptors_pbr.pth', num_object=28, label_offset=43)
# hb_bop23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/hb/descriptors_pbr.pth', num_object=33, label_offset=71)
# ycbv_bo23_feature_dataset = FeatureDataset(data_json='bop23_obj_features/ycbv/descriptors_pbr.pth', num_object=21, label_offset=104)
cur_feature_dataset = feature_dataset
# Example training loop
input_features = 1024 # Size of the input feature vector, 1024 for large, 768 for base, 384 for small
reduction = 4 # Reduction factor for the hidden layer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if adapter_type == 'clip':
learning_rate = 1e-4
model = ModifiedClipAdapter(input_features, reduction=reduction, ratio=ratio).to(device)
else:
learning_rate = 1e-3
model = WeightAdapter(input_features, reduction=reduction).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4) #
criterion = InfoNCELoss(temperature=temperature).to(device)
epochs = 40
dataloader = DataLoader(cur_feature_dataset, batch_size=batch_size, shuffle=False)
for epoch in range(epochs):
for inputs, labels in dataloader: # in dataloader: tqdm(dataloader)
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print(f'Epoch {epoch + 1}, Loss: {loss.item()}')
save_model = True
adapter_args = f'{dataset_name}_temp_{temperature}_epoch_{epochs}_lr_{learning_rate}_bs_{batch_size}_vec_reduction_{reduction}'
if save_model:
# Assuming your model is named 'model'
# model_path = f'adapter_weights/bop23/{adapter_args}_weights.pth' # Define the path where you want to save the model
model_path = f'adapter_weights/adapter2FC/{adapter_args}_weights.pth'
# Save the model state dictionary
torch.save(model.state_dict(), model_path)
print(f'Model weights saved to {model_path}')
save_features = True
if save_features:
# Assuming model is already defined and loaded with trained weights
model.eval() # Set the model to evaluation mode
batch_size = 64
# Assuming 'feature_dataset' is your Dataset object containing the feature vectors
# Assuming 'test_dataloader' is your DataLoader for the test dataset
test_dataloader = DataLoader(cur_feature_dataset, batch_size=batch_size, shuffle=False)
adatped_features = []
for inputs, labels in test_dataloader:
inputs = inputs.to(device)
# labels = labels.to(device)
with torch.no_grad():
outputs = model(inputs)
# Perform inference using the model
# Your inference code here
adatped_features.append(outputs)
adatped_features = torch.cat(adatped_features, dim=0)
print(adatped_features.size())
feat_dict = dict()
feat_dict['features'] = adatped_features.detach().cpu().tolist()
# output_dir = f'./bop23_obj_features/{dataset_folder}'
output_dir = './adapted_obj_feats'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
json_filename = f'{adapter_args}.json'
# json_filename = f'{adapter_type}_bs1024_epoch_{epochs}_adapter_descriptors_pbr.json'
with open(os.path.join(output_dir, json_filename), 'w') as f:
json.dump(feat_dict, f)
print(f"saving adapted features {os.path.join(output_dir, json_filename)}")