-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschedular.py
96 lines (72 loc) · 2.81 KB
/
schedular.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from task import *
RM_MODE = 1
DM_MODE = 2
EDF_MODE = 3
class Scheduler:
"""Scheduler Class
Attributes:
task_set (TaskSet): Task set to be scheduled
mode (int): Mode will choose the algorithm
preemptive (bool): Algorithm is preemptive or not
"""
def __init__(self, task_set, mode=RM_MODE):
self.task_set = task_set
# arrange tasks based on mode
if mode == RM_MODE:
self.rm()
elif mode == DM_MODE:
self.dm()
elif mode == EDF_MODE:
self.edf()
def schedule(self, time):
"""Schedule the next task to run
Args:
time (int): Current time.
Returns:
Task: The next task to run, or None if no tasks are ready
"""
ready_tasks = self.task_set.get_ready_tasks(time)
# first check if any task is ready or not
if len(ready_tasks) == 0:
return None
# return the top
return ready_tasks[0]
def edf(self):
"""EDF
Sort tasks based on the nearest deadline.
"""
# get interrupts first
interrupts = [task for task in self.task_set.get_all() if task.is_interrupt()]
interrupts.sort(key=lambda x: x.deadline)
# get other tasks
others = [task for task in self.task_set.get_all() if not task.is_interrupt()]
others.sort(key=lambda x: x.deadline)
# set new tasks
self.task_set.set_tasks(interrupts+others)
def dm(self):
"""DM
Sort tasks based on the deadline monotonic.
"""
# get interrupts first
interrupts = [task for task in self.task_set.get_all() if task.is_interrupt()]
interrupts.sort(key=lambda x: x.deadline)
# get other tasks by deadline monotonic
others = [task for task in self.task_set.get_all() if not task.is_interrupt()]
others.sort(key=lambda x: 1/x.deadline, reverse=True)
# set new tasks
self.task_set.set_tasks(interrupts+others)
def rm(self):
"""RM
Sort tasks based on the rate monotonic.
"""
# get interrupts first
interrupts = [task for task in self.task_set.get_all() if task.is_interrupt()]
interrupts.sort(key=lambda x: x.deadline)
# get periodic tasks by rate monotonic
periodic = [task for task in self.task_set.get_all() if not task.is_interrupt() and task.period != 0]
periodic.sort(key=lambda x: 1/x.period, reverse=True)
# get other tasks
others = [task for task in self.task_set.get_all() if not task.is_interrupt() and task.period == 0]
others.sort(key=lambda x: x.deadline)
# set new tasks
self.task_set.set_tasks(interrupts+periodic+others)