-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
113 lines (85 loc) · 4.87 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import datetime
from collections import defaultdict
class Message:
def __init__(self, msg_type, host, membership_list, counter, **kwargs):
self.type = msg_type
self.host = host
self.membership_list = membership_list
self.counter = counter
self.kwargs = kwargs
class FileSystemMessage:
def __init__(self, msg_type, leader_host, dest_host, replica_list, origin_filename, dest_filename, **kwargs):
self.type = msg_type
self.leader_host = leader_host
self.dest_host = dest_host
self.replica_list = replica_list
self.origin_filename = origin_filename
self.dest_filename = dest_filename
self.kwargs = kwargs
class MembershipInfo:
def __init__(self, host, membership_counter, clock_time, suspicion_flag=0, suspicion_start_time=None):
self.host = host
self.membership_counter = membership_counter
self.heartbeat_increase_time = clock_time
self.suspicion = {"flag": suspicion_flag, "start_time": suspicion_start_time}
class MembershipList:
def __init__(self):
self.active_nodes = {}
self.cleanup_status_dict = {} # 0: not cleaning up, 1: host in cleanup state
self.file_replication_dict = {}
self.write_lock_set = set()
self.read_lock_dict = defaultdict(int)
self.failed_nodes = []
def put_replica(self, sdfs_filename, dest_host):
"""adds replica info for a particular filename to the file_replication_dict"""
pass
def add_member(self, host, membership_counter, suspicion):
clock_time = datetime.datetime.now()
mem_info = MembershipInfo(host, membership_counter, clock_time, suspicion["flag"], suspicion["start_time"])
self.active_nodes[host] = mem_info
self.cleanup_status_dict[host] = {"flag": 0, "cleanup_start_time":None}
def update_member(self, host, membership_counter, cleanup_status, logger):
# In a any node says the host is failed, then it will be failed no matter what
if cleanup_status is not None and cleanup_status["flag"] == 1:
if self.cleanup_status_dict[host]["flag"] == 0:
print(f'[Gossip] Node {host[0]}:{host[1]} has been failed [{datetime.datetime.now()}] \n')
logger.debug(f'Node {host} has been failed')
self.cleanup_status_dict[host]["flag"] = 1
self.cleanup_status_dict[host]["cleanup_start_time"] = datetime.datetime.now()
else:
if self.cleanup_status_dict[host]["flag"] == 0:
mem_info = self.active_nodes[host]
if membership_counter > mem_info.membership_counter:
mem_info.membership_counter = membership_counter
mem_info.heartbeat_increase_time = datetime.datetime.now()
self.active_nodes[host] = mem_info
def update_member_with_suspicion(self, host, membership_counter, suspicion, cleanup_status, logger):
if cleanup_status is not None and cleanup_status["flag"] == 1:
if self.cleanup_status_dict[host]["flag"] == 0:
print(f'Node {host[0]}:{host[1]} has been failed [{datetime.datetime.now()}] \n')
logger.debug(f'Node {host} has been failed')
self.cleanup_status_dict[host]["flag"] = 1
self.cleanup_status_dict[host]["cleanup_start_time"] = datetime.datetime.now()
else:
if self.cleanup_status_dict[host]["flag"] == 0:
mem_info = self.active_nodes[host]
if suspicion["flag"] == 1:
if membership_counter >= mem_info.membership_counter:
mem_info.membership_counter = membership_counter
if mem_info.suspicion["flag"] == 0:
print(f'Node {host[1]} has been suspected [{datetime.datetime.now()}] \n')
mem_info.suspicion["flag"] = suspicion["flag"]
mem_info.suspicion["start_time"] = datetime.datetime.now()
self.active_nodes[host] = mem_info
else:
if membership_counter > mem_info.membership_counter:
mem_info.membership_counter = membership_counter
mem_info.heartbeat_increase_time = datetime.datetime.now() # update heartbeat increase time only when it is ping and not suspicion message
if mem_info.suspicion["flag"] == 1:
mem_info.suspicion["flag"] = suspicion["flag"]
mem_info.suspicion["start_time"] = None
self.active_nodes[host] = mem_info
def delete_member(self, host):
if host in self.active_nodes:
self.active_nodes.pop(host)
self.cleanup_status_dict.pop(host)