-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmain.cpp
156 lines (133 loc) · 6.19 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include "scheduler.h"
#include <assert.h>
#include <boost/bind.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int_distribution.hpp>
#include <boost/thread.hpp>
#include <iostream>
void printEleven()
{
std::cout << "Eleven!\n";
}
void printSomething(std::string message)
{
std::cout << message << "\n";
}
void repeatStuff(CScheduler& s)
{
std::cout << "Gonna start repeating every 2 secs\n";
s.scheduleEvery(boost::bind(printSomething, std::string("Repeat!")), 2);
}
void longRunningTask(int nSecondsToWait)
{
std::cout << "Long-running task, gonna sleep for " << nSecondsToWait << "seconds\n";
boost::this_thread::sleep_for(boost::chrono::seconds(nSecondsToWait));
std::cout << "Done sleeping\n";
}
void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime)
{
{
boost::unique_lock<boost::mutex> lock(mutex);
counter += delta;
}
boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min();
if (rescheduleTime != noTime) {
CScheduler::Function f = boost::bind(µTask, boost::ref(s), boost::ref(mutex), boost::ref(counter), -delta + 1, noTime);
s.schedule(f, rescheduleTime);
}
}
int main(int argc, char** argv)
{
// Stress test: thousands of microsecond-scheduled tasks,
// serviced by 100 threads.
//
// So... ten shared counters, which if all the tasks execute
// properly will be zero when the dust settles.
// Each task adds or subtracts from one of the counters a
// random amount, and then schedules another task 0-1000
// microseconds in the future to subtract or add from
// the counter -random_amount+2, so in the end the shared
// counters should sum to the number of tasks performed.
CScheduler microTasks;
boost::thread_group microThreads;
for (int i = 0; i < 50; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks));
boost::mutex counterMutex[10];
int counter[10] = {0};
boost::random::mt19937 rng(argc); // Seed with number of arguments
boost::random::uniform_int_distribution<> zeroToNine(0, 9);
boost::random::uniform_int_distribution<> randomMsec(-11, 1000);
boost::random::uniform_int_distribution<> randomDelta(-1000, 1000);
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
boost::chrono::system_clock::time_point now = start;
for (int i = 0; i < 1000; i++) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng);
CScheduler::Function f = boost::bind(µTask, boost::ref(microTasks),
boost::ref(counterMutex[whichCounter]), boost::ref(counter[whichCounter]),
randomDelta(rng), tReschedule);
microTasks.schedule(f, t);
}
boost::this_thread::sleep_for(boost::chrono::microseconds(600));
now = boost::chrono::system_clock::now();
// More threads and more tasks:
for (int i = 0; i < 50; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks));
for (int i = 0; i < 1000; i++) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng);
CScheduler::Function f = boost::bind(µTask, boost::ref(microTasks),
boost::ref(counterMutex[whichCounter]), boost::ref(counter[whichCounter]),
randomDelta(rng), tReschedule);
microTasks.schedule(f, t);
}
microTasks.stop(true); // Stop after all tasks done
microThreads.join_all();
std::cout << "Microtask counts: ";
int counterSum = 0;
for (int i = 0; i < 10; i++) {
std::cout << counter[i] << " ";
counterSum += counter[i];
}
std::cout << "\nSum: " << counterSum << "\n";
assert(counterSum == 2000);
//
// Human-timescale tests
//
CScheduler s;
s.scheduleFromNow(printEleven, 11);
s.scheduleFromNow(boost::bind(printSomething, std::string("Two")), 2);
s.scheduleFromNow(boost::bind(printSomething, std::string("One")), 1);
s.scheduleFromNow(boost::bind(printSomething, std::string("Five")), 5);
s.scheduleFromNow(boost::bind(printSomething, std::string("Wait... negative?")), -1);
s.scheduleFromNow(boost::bind(printSomething, std::string("AlsoTwo")), 2);
s.scheduleFromNow(boost::bind(printSomething, std::string("Three")), 3);
// After 4 seconds, start repeating every two seconds:
CScheduler::Function repeatFunction = boost::bind(repeatStuff, boost::ref(s));
s.scheduleFromNow(repeatFunction, 4);
boost::thread* schedulerThread = new boost::thread(boost::bind(&CScheduler::serviceQueue, &s));
// Stop after 12 seconds
s.scheduleFromNow(boost::bind(&CScheduler::stop, &s, false), 12);
schedulerThread->join();
delete schedulerThread;
// Note: even though the thread terminated, the
// repeatStuff task is still on the queue.
// Now use two threads to service the queue.
// If you change this code to use just one thread,
// the longRunningTask will prevent the other tasks
// from running.
s.scheduleFromNow(boost::bind(printSomething, std::string("Two")), 2);
s.scheduleFromNow(boost::bind(printSomething, std::string("One")), 1);
s.scheduleFromNow(boost::bind(longRunningTask, 11), 0);
boost::thread_group threadGroup;
threadGroup.create_thread(boost::bind(&CScheduler::serviceQueue, &s));
threadGroup.create_thread(boost::bind(&CScheduler::serviceQueue, &s));
boost::this_thread::sleep_for(boost::chrono::seconds(13));
// All threads MUST be terminated before the scheduler's
// destructor is called.
threadGroup.interrupt_all();
threadGroup.join_all();
return 0;
}