-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmain.py
125 lines (101 loc) · 3.49 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- coding:utf-8 -*-
from fastapi import FastAPI
import uvicorn
import leancloud
import os
from leancloud import user
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def initleancloud():
leancloud.init(os.environ["LEANCLOUD_ID"], os.environ["LEANCLOUD_KEY"])
@app.get("/api")
async def all(start: int = 0, end: int = -1, rule: str = "updated"):
list = ['title','created','updated','link','author','avatar']
# Verify key
initleancloud()
# Declare class
Friendspoor = leancloud.Object.extend('friend_poor')
query = Friendspoor.query
query.descending('time')
query.limit(1000)
# Choose class
query.select('title','time','updated','link','author','headimg','createdAt')
query_list = query.find()
Friendlist = leancloud.Object.extend('friend_list')
query_userinfo = Friendlist.query
query_userinfo.limit(1000)
query_userinfo.select('frindname','friendlink','firendimg','error')
query_list_user = query_userinfo.find()
# Result to arr
api_json = {}
friends_num = len(query_list_user)
active_num = len(set([item.get('author') for item in query_list]))
error_num = len([friend for friend in query_list_user if friend.get('error') == 'true'])
article_num = len(query_list)
last_updated_time = max([item.get('createdAt').strftime('%Y-%m-%d %H:%M:%S') for item in query_list])
api_json['statistical_data'] = {
'friends_num': friends_num,
'active_num': active_num,
'error_num': error_num,
'article_num': article_num,
'last_updated_time': last_updated_time
}
article_data_init = []
article_data = []
for item in query_list:
itemlist = {}
for elem in list:
if elem == 'created':
itemlist[elem] = item.get('time')
elif elem == 'avatar':
itemlist[elem] = item.get('headimg')
else:
itemlist[elem] = item.get(elem)
article_data_init.append(itemlist)
if end == -1: end = min(article_num, 1000)
if start < 0 or start >= min(article_num, 1000):
return {"message": "start error"}
if end <= 0 or end > min(article_num, 1000):
return {"message": "end error"}
if rule != "created" and rule != "updated":
return {"message": "rule error, please use 'created'/'updated'"}
article_data_init.sort(key=lambda x : x[rule], reverse=True)
index = 1
for item in article_data_init:
item["floor"] = index
index += 1
article_data.append(item)
api_json['article_data'] = article_data[start:end]
return api_json
@app.get("/friend")
async def friend():
list = ['name', 'link', 'avatar']
# Verify key
initleancloud()
Friendlist = leancloud.Object.extend('friend_list')
query_userinfo = Friendlist.query
query_userinfo.limit(1000)
query_userinfo.select('frindname','friendlink','firendimg')
query_list_user = query_userinfo.find()
# Result to arr
friend_list_json = []
for item in query_list_user:
itemlist = {
'name': item.get('frindname'),
'link': item.get('friendlink'),
'avatar': item.get('firendimg')
}
friend_list_json.append(itemlist)
return friend_list_json
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1")