main.py (view raw)
1from PIL import Image
2from Api import get_random_image, rating_normal, rating_lewd
3from Effects import tt_bt_effect, splash_effect, wot_effect, text_effect
4from Constants import get_localized_string as l
5
6from dotenv import load_dotenv
7load_dotenv()
8import os, logging
9logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
10from io import BytesIO
11
12from telegram.error import TelegramError
13from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
14from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
15
16lang = 'us'
17
18def _get_args(context):
19
20 logging.info(context.args)
21 return ' '.join(context.args)
22
23def _img_to_bio(image):
24
25 bio = BytesIO()
26
27 if image.mode in ("RGBA", "P"):
28 image = image.convert("RGB")
29
30 image.save(bio, 'JPEG')
31 bio.seek(0)
32 return bio
33
34def _get_message_content(message):
35
36 image = None
37 if len(message.photo) > 0:
38 image = message.photo[-1].get_file()
39 image = Image.open(BytesIO(image.download_as_bytearray()))
40
41 content = ""
42 if message.text is not None:
43 content = message.text.strip()
44 elif message.caption is not None:
45 content = message.caption.strip()
46
47 lines = content.split("\n")
48 r = lines[0].split(" ")
49
50 try:
51 if r[0][0] == '/':
52 r.pop(0)
53 except IndexError:
54 pass
55
56 lines[0] = " ".join(r)
57 content = "\n".join(lines)
58
59 return image, content, _get_author(message)
60
61def _get_reply(message, fallback=""):
62
63 if message is None:
64 return None, fallback, None
65
66 image, content, author = _get_message_content(message)
67
68 return image, content, author
69
70def _get_lewd(context):
71
72 try:
73 return context.chat_data["lewd"]
74 except KeyError:
75 return False
76
77def _get_image(context, tag="", bio=True):
78
79 if context is not None:
80 if _get_lewd(context):
81 image, url = get_random_image(rating_lewd, tag)
82 else:
83 image, url = get_random_image(rating_normal, tag)
84
85 if image is None:
86 logging.warning("Getting Image failed")
87 raise TelegramError("bad image")
88
89 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
90
91 if bio:
92 return _img_to_bio(image), markup
93 return image, markup
94
95def _get_all(update, check_fn, context):
96
97 image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
98 image_content, text_content, author_content = _get_message_content(update.message)
99
100 info_struct = {
101 "reply": {
102 "author": author_reply,
103 "text": text_reply,
104 "image": image_reply
105 },
106 "content": {
107 "author": author_content,
108 "text": text_content,
109 "image": image_content
110 }
111 }
112
113 logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}")
114
115 content = check_fn(info_struct)
116
117 if content is None:
118 return None, None, None
119
120 markup = ""
121 image = None
122
123 if image_reply is not None:
124 image = image_reply
125
126 if image_content is not None:
127 image = image_content
128
129 if image is None:
130 image, markup = _get_image(context, bio=False)
131
132 return content, image, markup
133
134def start(update: Update, context: CallbackContext):
135 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
136
137def set_lewd(update: Update, context: CallbackContext):
138
139 try:
140 output = False if context.chat_data["lewd"] else True
141 except KeyError:
142 output = True
143
144 context.chat_data['lewd'] = output
145 message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
146
147 context.bot.send_message(chat_id=update.effective_chat.id, text=message)
148
149def pic(update: Update, context: CallbackContext):
150
151 try:
152 tag = " " + context.args[0]
153 except IndexError:
154 tag = ""
155
156 image, markup = _get_image(context, tag)
157 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
158
159def raw(update: Update, context: CallbackContext):
160
161 tag = ""
162 try:
163 tag += " " + context.args[0]
164 tag += " " + context.args[1]
165 except IndexError:
166 pass
167
168 image, url = get_random_image("", tag)
169
170 if image is None:
171 logging.warning("Getting Image failed")
172 raise TelegramError("bad image")
173
174 image = _img_to_bio(image)
175 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
176
177 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
178
179def pilu(update: Update, context: CallbackContext):
180
181 logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
182 try:
183 tag = " " + context.args[0]
184 except IndexError:
185 tag = ""
186 image, url = get_random_image("rating:e" + tag)
187 if image is None:
188 logging.warning("Getting Image failed")
189 raise TelegramError("bad image")
190 return
191
192 image = _img_to_bio(image)
193 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
194
195 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
196
197def _format_author(user):
198
199 if user.username is not None:
200 return user.full_name + f" ({user.username})"
201 return user.full_name
202
203def _get_author(message):
204
205 if message.forward_from is not None:
206 return _format_author(message.forward_from)
207
208 if message.forward_sender_name is not None:
209 return message.forward_sender_name
210
211 if message.forward_from_chat is not None:
212 return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
213
214 return _format_author(message.from_user)
215
216def tt_check(info):
217
218 reply = info['reply']['text']
219 content = info['content']['text']
220
221 input_text = f"{reply} {content}".replace("\n", " ")
222
223 if input_text.strip() == "":
224 return None
225
226 return input_text
227
228def ttbt_check(info):
229
230 reply = info['reply']['text'].strip()
231 content = info['content']['text'].strip()
232
233 if len(content.split("\n")) > 1:
234 input_text = content
235 else:
236 input_text = f"{reply}\n{content}"
237
238 if input_text.strip() == "":
239 return None
240
241 return input_text
242
243def splash_check(info):
244
245 reply = info['reply']['text']
246 content = info['content']['text']
247
248 if content.strip() == "":
249 author = info['reply']['author']
250 input_text = f"{author}\n{reply}"
251 else:
252 author = info['content']['author']
253 input_text = f"{author}\n{content}"
254
255 if len(input_text.strip().split("\n")) < 2:
256 return None
257
258 return input_text
259
260def wot_check(info):
261
262 reply = info['reply']['text']
263 content = info['content']['text']
264
265 input_text = f"{reply}\n{content}"
266
267 if input_text.strip() == "":
268 return None
269
270 return input_text
271
272def ttbt(update: Update, context: CallbackContext):
273
274 content, image, markup = _get_all(update, ttbt_check, context)
275
276 if image is None:
277 update.message.reply_text(l("no_caption", lang))
278 return
279
280 image = _img_to_bio(tt_bt_effect(content, image))
281 update.message.reply_photo(photo=image, reply_markup=markup)
282
283
284def tt(update: Update, context: CallbackContext):
285
286 content, image, markup = _get_all(update, tt_check, context)
287
288 if image is None:
289 update.message.reply_text(l("no_caption", lang))
290 return
291
292 image = _img_to_bio(tt_bt_effect(content, image))
293 update.message.reply_photo(photo=image, reply_markup=markup)
294
295def bt(update: Update, context: CallbackContext):
296
297 content, image, markup = _get_all(update, tt_check, context)
298
299 if image is None:
300 update.message.reply_text(l("no_caption", lang))
301 return
302
303 image = _img_to_bio(tt_bt_effect("\n" + content, image))
304 update.message.reply_photo(photo=image, reply_markup=markup)
305
306def splash(update: Update, context: CallbackContext):
307
308 content, image, markup = _get_all(update, splash_check, context)
309
310 if image is None:
311 update.message.reply_text(l("no_caption", lang))
312 return
313
314 image = _img_to_bio(splash_effect(content, image))
315 update.message.reply_photo(photo=image, reply_markup=markup)
316
317def wot(update: Update, context: CallbackContext):
318
319 content, image, markup = _get_all(update, wot_check, context)
320
321 if image is None:
322 update.message.reply_text(l("no_caption", lang))
323 return
324
325 image = _img_to_bio(wot_effect(content, image))
326 update.message.reply_photo(photo=image, reply_markup=markup)
327
328def text(update: Update, context: CallbackContext):
329
330 content, image, markup = _get_all(update, wot_check, context)
331
332 if image is None:
333 update.message.reply_text(l("no_caption", lang))
334 return
335
336 image = _img_to_bio(text_effect(content, image))
337 update.message.reply_photo(photo=image, reply_markup=markup)
338
339def caps(update: Update, context: CallbackContext):
340
341 _, reply, _ = _get_reply(update.message.reply_to_message, _get_args(context))
342 context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
343
344def unknown(update: Update, context: CallbackContext):
345 logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
346
347def error_callback(update: Update, context: CallbackContext):
348 try:
349 raise context.error
350 except TelegramError:
351 logging.error("TelegramError!!")
352 context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
353
354def _add_effect_handler(dispatcher, command: str, callback):
355 dispatcher.add_handler(CommandHandler(command, callback))
356 dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
357
358def main():
359
360 updater = Updater(token=os.getenv("token"),
361 persistence=PicklePersistence(filename='bot-data.pkl',
362 store_bot_data=False,
363 store_callback_data=False,
364 store_user_data=False))
365
366 dispatcher = updater.dispatcher
367 dispatcher.add_error_handler(error_callback)
368
369 dispatcher.add_handler(CommandHandler('start', start))
370 dispatcher.add_handler(CommandHandler('lewd', set_lewd))
371 dispatcher.add_handler(CommandHandler('caps', caps))
372 dispatcher.add_handler(CommandHandler('pic', pic))
373 dispatcher.add_handler(CommandHandler('raw', raw))
374 dispatcher.add_handler(CommandHandler('pilu', pilu))
375
376 _add_effect_handler(dispatcher, 'ttbt', ttbt)
377 _add_effect_handler(dispatcher, 'tt', tt)
378 _add_effect_handler(dispatcher, 'bt', bt)
379 _add_effect_handler(dispatcher, 'splash', splash)
380 _add_effect_handler(dispatcher, 'wot', wot)
381 _add_effect_handler(dispatcher, 'text', text)
382
383 dispatcher.add_handler(MessageHandler(Filters.command, unknown))
384 updater.start_polling()
385 updater.idle()
386
387if __name__ == "__main__":
388 main()