-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcbi-tg-bot.py
162 lines (128 loc) · 5.13 KB
/
gcbi-tg-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python
import os
import logging
from dotenv import load_dotenv
from telegram import Update, ChatMemberUpdated, ChatMember, ParseMode
from telegram.ext import CallbackContext, Updater, CommandHandler, ChatMemberHandler
from typing import Tuple, Optional
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
MESSAGE_TIMEOUT = 120
def greet_chat_members(update: Update, context: CallbackContext) -> None:
"""Greets new users in chats and announces when someone leaves"""
result = extract_status_change(update.chat_member)
if result is None:
return
logger.info("triggered channel event")
was_member, is_member = result
member_name = update.chat_member.new_chat_member.user.mention_html()
if not was_member and is_member:
logger.info("new user joined the group, username: {}".format(
update.chat_member.new_chat_member.user.full_name))
message = update.effective_chat.send_message(
f"歡迎 {member_name} 加入GCBI有問必答. Welcome!\n"
"發問前可以先參考 <a href=\"https://docs.google.com/presentation/d/1QnfLG"
"8HGZb8TuwbEyeQoS5jRMDHIhQEK-i05g9aHtPA/edit?usp=sharing\">計劃詳情</a>\n"
"可以直接係group問問題 admin 見到會答\n"
"私人問題可以直接搵admin問\n"
"/help 召喚傳送門",
parse_mode=ParseMode.HTML,
disable_web_page_preview=True
)
elif was_member and not is_member:
logger.info("user left the group, username: {}".format(
update.chat_member.new_chat_member.user.full_name))
message = update.effective_chat.send_message(
f"bye bye {member_name} 得閒飲茶",
parse_mode=ParseMode.HTML,
)
if message and message.chat.type != "private":
context.job_queue.run_once(
clean_message,
MESSAGE_TIMEOUT,
context={
"chat_id": message.chat.id,
"message_id": message.message_id
}
)
def extract_status_change(
chat_member_update: ChatMemberUpdated,
) -> Optional[Tuple[bool, bool]]:
status_change = chat_member_update.difference().get("status")
old_is_member, new_is_member = chat_member_update.difference().get("is_member",
(None, None))
if status_change is None:
return None
old_status, new_status = status_change
was_member = (
old_status
in [
ChatMember.MEMBER,
ChatMember.CREATOR,
ChatMember.ADMINISTRATOR,
]
or (old_status == ChatMember.RESTRICTED and old_is_member is True)
)
is_member = (
new_status
in [
ChatMember.MEMBER,
ChatMember.CREATOR,
ChatMember.ADMINISTRATOR,
]
or (new_status == ChatMember.RESTRICTED and new_is_member is True)
)
return was_member, is_member
def start(update: Update, context: CallbackContext) -> None:
"""Send a message when the command /start is issued."""
message = context.bot.send_message(
chat_id=update.effective_chat.id,
text="I'm a bot, please talk to me!"
)
if message and message.chat.type != "private":
context.job_queue.run_once(
clean_message,
MESSAGE_TIMEOUT,
context={
"chat_id": message.chat.id,
"message_id": message.message_id
}
)
def help_command(update: Update, context: CallbackContext) -> None:
"""Send a message when the command /help is issued."""
message = update.message.reply_text(
"傳送門\n"
"<a href=\"https://forms.gle/DFpj8kfLFfs3D8zu9\">申請表格</a>\n"
"Email: [email protected]\n"
"<a href=\"https://gcbinorth.com/\">GCBI North offical website </a>\n"
"<a href=\"https://docs.google.com/presentation/d/1QnfLG8HGZb8TuwbEye"
"QoS5jRMDHIhQEK-i05g9aHtPA/edit?usp=sharing\">計劃詳情</a>",
parse_mode=ParseMode.HTML, disable_web_page_preview=True)
if message and message.chat.type != "private":
context.job_queue.run_once(
clean_message,
MESSAGE_TIMEOUT,
context={
"chat_id": message.chat.id,
"message_id": message.message_id
}
)
def clean_message(context: CallbackContext) -> None:
"""Clean up messages."""
context.bot.delete_message(
context.job.context["chat_id"], context.job.context["message_id"])
def main() -> None:
"""Start the bot."""
updater = Updater(os.environ.get("API_TOKEN"))
dispatcher = updater.dispatcher
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("help", help_command))
dispatcher.add_handler(ChatMemberHandler(
greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
updater.start_polling(allowed_updates=Update.ALL_TYPES)
updater.idle()
if __name__ == '__main__':
main()