main.py (view raw)
1from PIL import Image
2from Api import get_random_image, rating_normal, rating_lewd
3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
4from Constants import get_localized_string as l
5
6from dotenv import load_dotenv
7load_dotenv()
8import os, logging
9logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
10from io import BytesIO
11
12from telegram.error import TelegramError
13from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
14from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
15
16lang = 'us'
17
18def _get_args(context):
19
20 logging.info(context.args)
21 return ' '.join(context.args)
22
23def _get_message_content(message):
24
25 image = None
26 if len(message.photo) > 0:
27 image = message.photo[-1].get_file()
28 image = Image.open(BytesIO(image.download_as_bytearray()))
29
30 content = ""
31 if message.text is not None:
32 content = message.text.strip()
33 elif message.caption is not None:
34 content = message.caption.strip()
35
36 lines = content.split("\n")
37 r = lines[0].split(" ")
38
39 try:
40 if r[0][0] == '/':
41 r.pop(0)
42 except IndexError:
43 pass
44
45 lines[0] = " ".join(r)
46 content = "\n".join(lines)
47
48 return image, content, _get_author(message)
49
50def _get_reply(message, fallback=""):
51
52 if message is None:
53 return None, fallback, None
54
55 image, content, author = _get_message_content(message)
56
57 return image, content, author
58
59def _get_lewd(context):
60
61 try:
62 return context.chat_data["lewd"]
63 except KeyError:
64 return False
65
66def _get_image(context, tag="", bio=True):
67
68 if context is not None:
69 if _get_lewd(context):
70 image, url = get_random_image(rating_lewd, tag)
71 else:
72 image, url = get_random_image(rating_normal, tag)
73
74 if image is None:
75 logging.warning("Getting Image failed")
76 raise TelegramError("bad image")
77
78 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
79
80 if bio:
81 return image, markup
82 return image, markup
83
84def _get_all(update, check_fn, context):
85
86 image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
87 image_content, text_content, author_content = _get_message_content(update.message)
88
89 info_struct = {
90 "reply": {
91 "author": author_reply,
92 "text": text_reply,
93 "image": image_reply
94 },
95 "content": {
96 "author": author_content,
97 "text": text_content,
98 "image": image_content
99 }
100 }
101
102 logging.info(f"User {update.message.from_user.full_name} (@{update.message.from_user.username}) typed: {str(update.message.text)}")
103
104 content = check_fn(info_struct)
105
106 if content is None:
107 return None, None, None
108
109 markup = ""
110 image = None
111
112 if image_reply is not None:
113 image = image_reply
114
115 if image_content is not None:
116 image = image_content
117
118 if image is None:
119 image, markup = _get_image(context, bio=False)
120
121 return content, image, markup
122
123def start(update: Update, context: CallbackContext):
124 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
125
126def set_lewd(update: Update, context: CallbackContext):
127
128 try:
129 output = False if context.chat_data["lewd"] else True
130 except KeyError:
131 output = True
132
133 context.chat_data['lewd'] = output
134 message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
135
136 context.bot.send_message(chat_id=update.effective_chat.id, text=message)
137
138def pic(update: Update, context: CallbackContext):
139
140 try:
141 tag = " " + context.args[0]
142 except IndexError:
143 tag = ""
144
145 image, markup = _get_image(context, tag)
146 update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
147
148def raw(update: Update, context: CallbackContext):
149
150 tag = ""
151 try:
152 tag += " " + context.args[0]
153 tag += " " + context.args[1]
154 except IndexError:
155 pass
156
157 image, url = get_random_image("", tag)
158
159 if image is None:
160 logging.warning("Getting Image failed")
161 raise TelegramError("bad image")
162
163 image = _img_to_bio(image)
164 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
165
166 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
167
168def pilu(update: Update, context: CallbackContext):
169
170 logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
171 try:
172 tag = " " + context.args[0]
173 except IndexError:
174 tag = ""
175 image, url = get_random_image("rating:e" + tag)
176 if image is None:
177 logging.warning("Getting Image failed")
178 raise TelegramError("bad image")
179 return
180
181 image = _img_to_bio(image)
182 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
183
184 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
185
186def _format_author(user):
187
188 if user.username is not None:
189 return user.full_name + f" ({user.username})"
190 return user.full_name
191
192def _get_author(message):
193
194 if message.forward_from is not None:
195 return _format_author(message.forward_from)
196
197 if message.forward_sender_name is not None:
198 return message.forward_sender_name
199
200 if message.forward_from_chat is not None:
201 return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
202
203 return _format_author(message.from_user)
204
205def tt_check(info):
206
207 reply = info['reply']['text']
208 content = info['content']['text']
209
210 input_text = f"{reply} {content}".replace("\n", " ")
211
212 if input_text.strip() == "":
213 return None
214
215 return input_text
216
217def ttbt_check(info):
218
219 reply = info['reply']['text'].strip()
220 content = info['content']['text'].strip()
221
222 if len(content.split("\n")) > 1:
223 input_text = content
224 else:
225 input_text = f"{reply}\n{content}"
226
227 if input_text.strip() == "":
228 return None
229
230 return input_text
231
232def splash_check(info):
233
234 reply = info['reply']['text']
235 content = info['content']['text']
236
237 if content.strip() == "":
238 author = info['reply']['author']
239 input_text = f"{author}\n{reply}"
240 else:
241 author = info['content']['author']
242 input_text = f"{author}\n{content}"
243
244 if len(input_text.strip().split("\n")) < 2:
245 return None
246
247 return input_text
248
249def wot_check(info):
250
251 reply = info['reply']['text']
252 content = info['content']['text']
253
254 input_text = f"{reply}\n{content}"
255
256 if input_text.strip() == "":
257 return None
258
259 return input_text
260
261def ttbt(update: Update, context: CallbackContext):
262
263 content, image, markup = _get_all(update, ttbt_check, context)
264
265 if image is None:
266 update.message.reply_text(l("no_caption", lang))
267 return
268
269 image = tt_bt_effect(content, image)
270
271 if image is None:
272 update.message.reply_text(l("failed_effect", lang))
273
274 update.message.reply_photo(photo=image, reply_markup=markup)
275
276
277def tt(update: Update, context: CallbackContext):
278
279 content, image, markup = _get_all(update, tt_check, context)
280
281 if image is None:
282 update.message.reply_text(l("no_caption", lang))
283 return
284
285 image = tt_bt_effect(content, image)
286
287 if image is None:
288 update.message.reply_text(l("failed_effect", lang))
289
290 update.message.reply_photo(photo=image, reply_markup=markup)
291
292def bt(update: Update, context: CallbackContext):
293
294 content, image, markup = _get_all(update, tt_check, context)
295
296 if image is None:
297 update.message.reply_text(l("no_caption", lang))
298 return
299
300 image = bt_effect(content, image)
301
302 if image is None:
303 update.message.reply_text(l("failed_effect", lang))
304
305 update.message.reply_photo(photo=image, reply_markup=markup)
306
307def splash(update: Update, context: CallbackContext):
308
309 content, image, markup = _get_all(update, splash_check, context)
310
311 if image is None:
312 update.message.reply_text(l("no_caption", lang))
313 return
314
315 image = splash_effect(content, image)
316
317 if image is None:
318 update.message.reply_text(l("failed_effect", lang))
319
320 update.message.reply_photo(photo=image, reply_markup=markup)
321
322def wot(update: Update, context: CallbackContext):
323
324 content, image, markup = _get_all(update, wot_check, context)
325
326 if image is None:
327 update.message.reply_text(l("no_caption", lang))
328 return
329
330 image = wot_effect(content, image)
331
332 if image is None:
333 update.message.reply_text(l("failed_effect", lang))
334
335 update.message.reply_photo(photo=image, reply_markup=markup)
336
337def text(update: Update, context: CallbackContext):
338
339 content, image, markup = _get_all(update, wot_check, context)
340
341 if image is None:
342 update.message.reply_text(l("no_caption", lang))
343 return
344
345 image = text_effect(content, image)
346
347 if image is None:
348 update.message.reply_text(l("failed_effect", lang))
349
350 update.message.reply_photo(photo=image, reply_markup=markup)
351
352def caps(update: Update, context: CallbackContext):
353
354 _, reply, _ = _get_reply(update.message.reply_to_message, _get_args(context))
355 context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
356
357def unknown(update: Update, context: CallbackContext):
358 logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
359
360def error_callback(update: Update, context: CallbackContext):
361 try:
362 raise context.error
363 except TelegramError:
364 logging.error("TelegramError!!")
365 context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
366
367def _add_effect_handler(dispatcher, command: str, callback):
368 dispatcher.add_handler(CommandHandler(command, callback))
369 dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
370
371def main():
372
373 updater = Updater(token=os.getenv("token"),
374 persistence=PicklePersistence(filename='bot-data.pkl',
375 store_bot_data=False,
376 store_callback_data=False,
377 store_user_data=False))
378
379 dispatcher = updater.dispatcher
380 dispatcher.add_error_handler(error_callback)
381
382 dispatcher.add_handler(CommandHandler('start', start))
383 dispatcher.add_handler(CommandHandler('lewd', set_lewd))
384 dispatcher.add_handler(CommandHandler('caps', caps))
385 dispatcher.add_handler(CommandHandler('pic', pic))
386 dispatcher.add_handler(CommandHandler('raw', raw))
387 dispatcher.add_handler(CommandHandler('pilu', pilu))
388
389 _add_effect_handler(dispatcher, 'ttbt', ttbt)
390 _add_effect_handler(dispatcher, 'tt', tt)
391 _add_effect_handler(dispatcher, 'bt', bt)
392 _add_effect_handler(dispatcher, 'splash', splash)
393 _add_effect_handler(dispatcher, 'wot', wot)
394 _add_effect_handler(dispatcher, 'text', text)
395
396 dispatcher.add_handler(MessageHandler(Filters.command, unknown))
397 updater.start_polling()
398 updater.idle()
399
400if __name__ == "__main__":
401 main()