-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflask.go.tmpl
149 lines (130 loc) · 4.99 KB
/
flask.go.tmpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
{{ define "flask" -}}
{{- $services := .Services -}}
{{- $typeMap := .TypeMap -}}
{{- if $services -}}
#
# Imports
#
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, Callable
#
# Server
#
class WebRPCError(Exception):
def __init__(self, message, status_code=None):
super().__init__(f"webrpc error: {message}")
self.status_code = status_code
def validate_type(value, type):
if type in ['null', 'any']:
return True
if type in ['byte', 'string', 'timestamp']:
return isinstance(value, str)
if type == 'bool':
return isinstance(value, bool)
if type in ['uint', 'uint8', 'uint16', 'uint32', 'uint64', 'int', 'int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
return isinstance(value, (int, float))
if type == 'object':
return isinstance(value, dict)
if type.startswith('[]'):
if not isinstance(value, list):
return False
elem_type = type[2:]
return all(validate_type(item, elem_type) for item in value)
if type in type_map:
return validate_type(value, type_map[type])
return False
type_map = {
'null': 'None',
'any': 'object',
'byte': 'str',
'bool': 'bool',
'uint': 'int',
'uint8': 'int',
'uint16': 'int',
'uint32': 'int',
'uint64': 'int',
'int': 'int',
'int8': 'int',
'int16': 'int',
'int32': 'int',
'int64': 'int',
'float32': 'float',
'float64': 'float',
'string': 'str',
'timestamp': 'str',
}
class WebRPCRequestHandler(BaseHTTPRequestHandler):
def _parse_request_body(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
return json.loads(body) if body else {}
def _send_response(self, response, status_code=200):
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode('utf-8'))
def _handle_rpc_request(self, service_name, method_name, request_body):
service_impl = self.server.service_implementation.get(service_name)
if not service_impl:
raise WebRPCError(f"Service '{service_name}' not found", 404)
method_impl = getattr(service_impl, method_name, None)
if not method_impl or not callable(method_impl):
raise WebRPCError(f"Method '{method_name}' not found", 404)
try:
method_args = request_body.get('args', {})
method_headers = request_body.get('headers', {})
for arg_name, arg_type in service_impl.method_args.get(method_name, {}).items():
if arg_name not in method_args:
if arg_name not in method_headers:
raise WebRPCError(f"Missing argument '{arg_name}'")
method_args[arg_name] = method_headers[arg_name]
if not validate_type(method_args[arg_name], arg_type):
raise WebRPCError(f"Invalid argument '{arg_name}'")
response = method_impl(method_args)
method_returns = service_impl.method_returns.get(method_name, {})
for return_name, return_type in method_returns.items():
if return_name not in response:
raise WebRPCError("Internal server error", 500)
if not validate_type(response[return_name], return_type):
raise WebRPCError("Internal server error", 500)
self._send_response(response)
except WebRPCError as err:
status_code = err.status_code or 400
response = {
'msg': err.message,
'status': status_code,
'code': '',
}
self._send_response(response, status_code)
except Exception as err:
self._send_response(str(err), 400)
def do_POST(self):
path_parts = self.path.strip('/').split('/')
if len(path_parts) != 2:
self.send_error(404)
return
service_name, method_name = path_parts
if not service_name or not method_name:
self.send_error(404)
return
if self.headers.get('Content-Type') != 'application/json':
self.send_error(415)
return
try:
request_body = self._parse_request_body()
self._handle_rpc_request(service_name, method_name, request_body)
except Exception as err:
self.send_error(400, str(err))
def create_server(service_implementation, hostname='localhost', port=8080):
server_address = (hostname, port)
server = HTTPServer(server_address, WebRPCRequestHandler)
server.service_implementation = service_implementation
return server
{{range $services}}
{{$service_name := .Name}}
def create_{{$service_name}}_app(service_implementation):
return create_server({{$service_name}}(), service_implementation)
{{- end -}}
{{- end -}}
{{- end -}}