forked from hpi-epic/pricewars
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbenchmark.py
146 lines (113 loc) · 5.19 KB
/
benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import argparse
import os
from os.path import dirname
import subprocess
import time
import json
import datetime
import shlex
import requests
from kafka import KafkaConsumer
from analyze import analyze_kafka_dump
class PopenWrapper:
"""
This class is a context manager that wraps subprocess.Popen.
Popen waits until the created process is finished when exiting the context.
This wrapper additionally sends a terminate signal to the program before waiting for it to finish.
"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __enter__(self):
self.process = subprocess.Popen(*self.args, **self.kwargs)
return self.process
def __exit__(self, *args):
self.process.terminate()
self.process.__exit__(*args)
def dump_topic(topic, output_dir):
consumer = KafkaConsumer(topic,
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=2000,
auto_offset_reset='earliest')
events = [message.value for message in consumer]
with open(os.path.join(output_dir, topic), 'w') as file:
json.dump(events, file)
def dump_kafka(output_dir):
kafka_dir = os.path.join(output_dir, 'kafka')
os.mkdir(kafka_dir)
topics = ['buyOffer', 'holding_cost', 'marketSituation', 'producer']
for topic in topics:
dump_topic(topic, kafka_dir)
def save_merchant_id_mapping(output_dir):
merchants_info = requests.get('http://marketplace:8080/merchants').json()
merchant_mapping = {}
for merchant_info in merchants_info:
merchant_mapping[merchant_info['merchant_id']] = merchant_info['merchant_name']
with open(os.path.join(output_dir, 'merchant_id_mapping.json'), 'w') as file:
json.dump(merchant_mapping, file)
def clear_containers(pricewars_dir):
subprocess.run(['docker-compose', 'rm', '--stop', '--force'], cwd=pricewars_dir)
def wait_for_marketplace(timeout=300):
"""
Send requests to the marketplace until there is a response
"""
start = time.time()
while time.time() - start < timeout:
try:
requests.get('http://marketplace:8080')
return
except requests.exceptions.ConnectionError:
pass
raise RuntimeError('Cannot reach marketplace')
def main():
pricewars_dir = dirname(dirname(os.path.abspath(__file__)))
parser = argparse.ArgumentParser(
description='Runs a simulation on the Pricewars platform',
epilog='Usage example: python3 %(prog)s --duration 5 --output ~/results'
'--merchants "python3 merchant/merchant.py --port 5000" --consumer "python3 consumer/consumer.py"')
parser.add_argument('--duration', '-d', metavar='MINUTES', type=float, required=True, help='Run that many minutes')
parser.add_argument('--output', '-o', metavar='DIRECTORY', type=str, required=True)
parser.add_argument('--merchants', '-m', metavar='MERCHANT', type=str, nargs='+', required=True,
help='commands to start merchants')
#parser.add_argument('--consumer', '-c', type=str, required=True, help='command to start consumer')
parser.add_argument('--holding_cost', type=float, default=0.0)
args = parser.parse_args()
duration_in_minutes = args.duration
if not os.path.isdir(args.output):
raise RuntimeError('Invalid output directory: ' + args.output)
output_dir = os.path.join(args.output, datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S%z"))
os.mkdir(output_dir)
clear_containers(pricewars_dir)
core_services = ['producer', 'marketplace', 'management-ui', 'analytics', 'flink-taskmanager', 'flink-jobmanager',
'kafka-reverse-proxy', 'kafka', 'zookeeper', 'redis', 'postgres', 'consumer']
with PopenWrapper(['docker-compose', 'up'] + core_services, cwd=pricewars_dir):
# wait until the marketplace service is up and running
wait_for_marketplace()
# configure marketplace
requests.put('http://marketplace:8080/holding_cost_rate', json={'rate': args.holding_cost})
print('Starting consumer')
consumer_settings = requests.get('http://consumer:3000/setting').json()
print(consumer_settings)
time.sleep(10)
response = requests.post('http://consumer:3000/setting', json=consumer_settings)
response.raise_for_status()
#consumer = subprocess.Popen(shlex.split(args.consumer))
print('Starting merchants')
merchants = [subprocess.Popen(shlex.split(command)) for command in args.merchants]
# Run for the given amount of time
print('Run for', duration_in_minutes, 'minutes')
time.sleep(duration_in_minutes * 60)
print('Stopping consumer')
requests.delete('http://consumer:3000/setting')
#consumer.terminate()
#consumer.wait()
print('Stopping merchants')
for merchant in merchants:
merchant.terminate()
merchant.wait()
dump_kafka(output_dir)
save_merchant_id_mapping(output_dir)
analyze_kafka_dump(output_dir)
if __name__ == '__main__':
main()