-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathask_xiaobing.py
More file actions
232 lines (173 loc) · 8.18 KB
/
ask_xiaobing.py
File metadata and controls
232 lines (173 loc) · 8.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# coding=utf8
# 使用前需关注小冰公众号
# inspired by: https://github.com/Lafree317/PythonChat/blob/master/chat.py
# also referred to: https://zhuanlan.zhihu.com/p/30899907
from __future__ import print_function
from threading import Timer
import itchat
import datetime
from itchat.content import *
from collections import deque
WAKEN_MSG = [u"小冰", u"小冰小冰", u"小冰呢", u"小冰呢?", u"小冰回来", u"小冰出来"]
HIBERNATE_MSG = [u"小冰住嘴", u"小冰闭嘴", u"滚", u"你滚", u"你闭嘴", u"下去吧", u"小冰下去", u"小冰退下"]
TRIGGER_MSG = WAKEN_MSG + HIBERNATE_MSG
XIAOBING_IDLENESS_THRESHOLD = 1 # sec
MSG_PROCESS_FREQ = 0.5 # sec
# --------------------------------------------- Handle Friend Chat ---------------------------------------------------
@itchat.msg_register([TEXT, PICTURE], isFriendChat=True)
def text_reply(msg):
""" handle robot switch and friends messages """
to_user = itchat.search_friends(userName=msg['ToUserName'])
from_user = itchat.search_friends(userName=msg['FromUserName'])
if is_my_outgoing_msg(msg):
handle_outgoing_msg(msg, to_user)
else: # this is an incoming message from my friend
handle_incoming_msg(msg, from_user)
@itchat.msg_register([TEXT, PICTURE], isGroupChat = True)
def group_reply(msg):
from_user_name = msg['FromUserName']
to_user_name = msg['ToUserName']
if is_my_outgoing_msg(msg):
group = itchat.search_chatrooms(userName=to_user_name)
handle_outgoing_msg(msg, group)
else:
group = itchat.search_chatrooms(userName=from_user_name)
handle_incoming_msg(msg, group)
def handle_outgoing_msg(msg, to_user):
debug_print(u'I sent a message {} to {}'.format(msg['Text'], get_user_display_name(to_user)))
if msg['Content'] in TRIGGER_MSG:
handle_robot_switch(msg, to_user)
def handle_incoming_msg(msg, from_user):
global peer_list
debug_print(u'I received a message {} from {}'.format(msg['Text'], get_user_display_name(from_user)))
if msg['Content'] in TRIGGER_MSG:
handle_robot_switch(msg, from_user)
else: # don't ask xiaobing with trigger question
if msg['FromUserName'] in peer_list:
handle_message_queue(msg, from_user)
def handle_message_queue(msg, from_user):
global asker_queue, unprocessed_questions
from_user_id_name = msg['FromUserName']
from_user_display_name = get_user_display_name(from_user)
debug_print(u'Robot reply is on for {}! Adding message to queue...'.format(from_user_display_name))
if from_user_id_name not in unprocessed_questions:
# this user has no unprocessed question, adding to the asker queue
asker_queue.append(from_user_id_name)
else:
debug_print(u'{} is asking questions too quickly. Drop the previous one and use the current'.format(
from_user_display_name
))
# only register the last question of each unprocessed asker
unprocessed_questions[from_user_id_name] = msg
def handle_robot_switch(incoming_msg, outgoing_msg_target_user):
""" Turn robot on/off according to the trigger message """
global peer_list
if not outgoing_msg_target_user:
debug_print(u'Outgoing message target user not recognized. Can\'t turn on/off robot')
return
display_name = get_user_display_name(outgoing_msg_target_user)
user_id_name = outgoing_msg_target_user['UserName']
if incoming_msg['Content'] in WAKEN_MSG:
if user_id_name not in peer_list:
debug_print(u'Turning on robot for {}'.format(display_name))
peer_list.add(user_id_name)
itchat.send_msg(u'小冰: 我在这儿呢^_^', user_id_name)
else:
debug_print(u'Robot is already turned on for {}'.format(display_name))
elif incoming_msg['Content'] in HIBERNATE_MSG:
if user_id_name in peer_list:
debug_print(u'Turning off robot for {}'.format(display_name))
peer_list.remove(user_id_name)
itchat.send_msg(u'小冰: (默默走开>.<)', user_id_name)
else:
debug_print(u'Robot is already turned off for {}'.format(display_name))
# --------------------------------------------- Handle Xiaobing Reply ------------------------------------------------
@itchat.msg_register([TEXT, PICTURE, FRIENDS, CARD, MAP, SHARING, RECORDING, ATTACHMENT, VIDEO], isMpChat=True)
def map_reply(msg):
""" relay back xiaobing's response """
if msg['FromUserName'] == xiao_bing_user_name:
handle_xiaobing_reply(msg)
def handle_xiaobing_reply(msg):
global current_asker_id_name, last_xiaobing_response_ts, is_xiaobing_busy
if not current_asker_id_name:
debug_print('Xiaobing replied but has no one to contact')
return
last_xiaobing_response_ts = now()
is_xiaobing_busy = False
asker = itchat.search_friends(userName=current_asker_id_name)
if msg['Type'] == 'Picture':
debug_print(u'Xiaobing replied a picture. Relaying to {}'.format(get_user_display_name(asker)))
send_img(msg, current_asker_id_name)
elif msg['Type'] == 'Text':
debug_print(u'Xiaobing replied {}. Relaying to {}'.format(msg['Text'], get_user_display_name(asker)))
itchat.send_msg(u'小冰: {}'.format(msg['Text']), current_asker_id_name)
else:
# gracefully handle unsupported formats with generic reply
debug_print(u'Xiaobing replied a {}, which is not yet supported'.format(msg['Type']))
itchat.send_msg(u'小冰: 嘤嘤嘤', current_asker_id_name)
# ------------------------------------------ Message Queue Processor ------------------------------------------------
def process_message():
global asker_queue, current_asker_id_name, last_xiaobing_response_ts, is_xiaobing_busy
if len(asker_queue) == 0:
# debug_print(u'Was asked to process message but the queue is empty')
pass
elif is_xiaobing_busy:
# skip this round if xiaobing is currently busy
pass
# if no one has asked xiaobing yet or xiaobing has been idle for a period of time
elif (not last_xiaobing_response_ts or
now() - last_xiaobing_response_ts > datetime.timedelta(seconds=XIAOBING_IDLENESS_THRESHOLD)):
current_asker_id_name = asker_queue.popleft()
msg = unprocessed_questions.pop(current_asker_id_name, None)
debug_print(u'Xiaobing is available. Asking questions on behalf of {}'.format(
get_user_display_name(user_id_name=current_asker_id_name)
))
is_xiaobing_busy = True
ask_xiaobing(msg)
# check back later
Timer(MSG_PROCESS_FREQ, process_message).start()
# --------------------------------------------- Helper Functions ---------------------------------------------------
def now():
return datetime.datetime.now()
def debug_print(msg):
if not debug:
return
try:
print(u'{} {}'.format(now(), msg))
except Exception as e:
print(str(e))
def send_img(msg, user_name):
""" wrapper around itchat's weird way of image forwarding """
msg['Text'](msg['FileName'])
itchat.send_image(msg['FileName'], user_name)
def ask_xiaobing(msg):
if msg['Type'] == 'Picture':
send_img(msg, xiao_bing_user_name)
else:
text = msg['Text']
if text and text.startswith(u'小冰: '):
# remove dialog prefix when bots talk to each other
text = text.replace(u'小冰: ', '')
itchat.send_msg(text, xiao_bing_user_name)
def get_user_display_name(user=None, user_id_name=None):
if user:
return user['RemarkName'] or user['NickName'] or user['Name']
elif user_id_name:
return get_user_display_name(user=itchat.search_friends(userName=user_id_name))
else:
return 'user not found'
def is_my_outgoing_msg(msg):
return msg['FromUserName'] == my_user_name
if __name__ == '__main__':
itchat.auto_login()
my_user_name = itchat.get_friends(update=True)[0]["UserName"]
xiao_bing_user_name = itchat.search_mps(name=u'小冰')[0]["UserName"]
peer_list = set()
asker_queue = deque()
unprocessed_questions = {}
current_asker_id_name = None
last_xiaobing_response_ts = None
debug = True
is_xiaobing_busy = False
process_message()
itchat.run()