-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfastapi.go.tmpl
114 lines (99 loc) · 3.58 KB
/
fastapi.go.tmpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
{{- define "fastapi" -}}
{{- $services := .Services -}}
{{- $typeMap := .TypeMap -}}
#
# Imports
#
import json
from fastapi import FastAPI, HTTPException
from typing import Dict, Callable
#
# Server
#
app = FastAPI()
class WebRPCError(Exception):
def __init__(self, message, status_code=None):
super().__init__(f"webrpc error: {message}")
self.status_code = status_code
def validate_type(value, type):
if type in ['null', 'any']:
return True
if type in ['byte', 'string', 'timestamp']:
return isinstance(value, str)
if type == 'bool':
return isinstance(value, bool)
if type in ['uint', 'uint8', 'uint16', 'uint32', 'uint64', 'int', 'int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
return isinstance(value, (int, float))
if type == 'object':
return isinstance(value, dict)
if type.startswith('[]'):
if not isinstance(value, list):
return False
elem_type = type[2:]
return all(validate_type(item, elem_type) for item in value)
if type in type_map:
return validate_type(value, type_map[type])
return False
type_map = {
'null': 'None',
'any': 'object',
'byte': 'str',
'bool': 'bool',
'uint': 'int',
'uint8': 'int',
'uint16': 'int',
'uint32': 'int',
'uint64': 'int',
'int': 'int',
'int8': 'int',
'int16': 'int',
'int32': 'int',
'int64': 'int',
'float32': 'float',
'float64': 'float',
'string': 'str',
'timestamp': 'str',
}
def handle_rpc_request(request, service_name, method_name):
service_impl = request.app.service_implementation.get(service_name)
if not service_impl:
raise WebRPCError(f"Service '{service_name}' not found", 404)
method_impl = getattr(service_impl, method_name, None)
if not method_impl or not callable(method_impl):
raise WebRPCError(f"Method '{method_name}' not found", 404)
try:
method_args = request.json.get('args', {})
method_headers = request.json.get('headers', {})
for arg_name, arg_type in service_impl.method_args.get(method_name, {}).items():
if arg_name not in method_args:
if arg_name not in method_headers:
raise WebRPCError(f"Missing argument '{arg_name}'")
method_args[arg_name] = method_headers[arg_name]
if not validate_type(method_args[arg_name], arg_type):
raise WebRPCError(f"Invalid argument '{arg_name}'")
response = method_impl(method_args)
method_returns = service_impl.method_returns.get(method_name, {})
for return_name, return_type in method_returns.items():
if return_name not in response:
raise WebRPCError("Internal server error", 500)
if not validate_type(response[return_name], return_type):
raise WebRPCError("Internal server error", 500)
return response
except WebRPCError as err:
status_code = err.status_code or 400
raise HTTPException(status_code=status_code, detail=err.message)
except Exception as err:
raise HTTPException(status_code=400, detail=str(err))
@app.post("/{service_name}/{method_name}/")
def handle_http_request(service_name: str, method_name: str, request: dict):
try:
return handle_rpc_request(request, service_name, method_name)
except HTTPException as err:
raise err
{{range $services}}
{{$service_name := .Name}}
def create_{{$service_name}}_app(service_implementation):
app.service_implementation = service_implementation
return app
{{- end -}}
{{- end -}}