all repos — python-meme-bot @ e11e33a9f3ccc4aec435d101110c5a6c19d86faf

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l
  5from Games import spin
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError
 14from telegram.ext import Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, Filters, PicklePersistence
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17lang = 'us'
 18
 19def _get_message_content(message):
 20    
 21    image = None
 22    if len(message.photo) > 0:
 23        image = message.photo[-1].get_file()
 24        image = Image.open(BytesIO(image.download_as_bytearray()))
 25    
 26    content = ""
 27    if message.text is not None:
 28        content = message.text.strip()
 29    elif message.caption is not None:
 30        content = message.caption.strip()
 31        
 32    lines = content.split("\n")
 33    r = lines[0].split(" ")
 34    
 35    try:
 36        if r[0][0] == '/':
 37            r.pop(0)
 38    except IndexError:
 39        pass
 40        
 41    lines[0] = " ".join(r)
 42    content = "\n".join(lines)
 43    
 44    return image, content, _get_author(message)
 45
 46def _get_reply(message, fallback=""):
 47    
 48    if message is None:
 49        return None, fallback, None
 50    
 51    image, content, author = _get_message_content(message)
 52    
 53    return image, content, author
 54
 55def _get_lewd(context):
 56    
 57    try:
 58        return context.chat_data["lewd"]
 59    except KeyError:
 60        return False
 61
 62def _get_image(context, tag="", bio=True):
 63    
 64    if context is not None:
 65        if _get_lewd(context):
 66            image, url = get_random_image(rating_lewd, tag)
 67        else:
 68            image, url = get_random_image(rating_normal, tag)
 69    
 70    if image is None:
 71        logging.warning("Getting Image failed")
 72        raise TelegramError("bad image")
 73    
 74    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 75    
 76    if bio:
 77        return image, markup
 78    return image, markup
 79
 80def _get_all(update, check_fn, context):
 81    
 82    image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
 83    image_content, text_content, author_content = _get_message_content(update.message)
 84    
 85    info_struct = {
 86        "reply": {
 87            "author": author_reply,
 88            "text": text_reply,
 89            "image": image_reply
 90        },
 91        "content": {
 92            "author": author_content,
 93            "text": text_content,
 94            "image": image_content
 95        }
 96    }
 97        
 98    logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
 99    
100    content = check_fn(info_struct)
101    
102    if content is None:
103        return None, None, None
104
105    markup = ""
106    image = None
107    
108    if image_reply is not None:
109        image = image_reply
110    
111    if image_content is not None:
112        image = image_content
113    
114    if image is None:
115        image, markup = _get_image(context, bio=False)
116    
117    return content, image, markup
118
119def start(update: Update, context: CallbackContext):
120    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
121
122def set_lewd(update: Update, context: CallbackContext):
123    
124    try:
125        output = False if context.chat_data["lewd"] else True
126    except KeyError:
127        output = True
128        
129    context.chat_data['lewd'] = output
130    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
131    
132    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
133
134def pic(update: Update, context: CallbackContext):
135    
136    try:
137        tag = " " + context.args[0]
138    except IndexError:
139        tag = ""
140
141    image, markup = _get_image(context, tag)
142    update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
143
144def raw(update: Update, context: CallbackContext):
145    
146    tag = ""
147    try:
148        tag += " " + context.args[0]
149        tag += " " + context.args[1]
150    except IndexError:
151        pass
152    
153    image, url = get_random_image("", tag)
154    
155    if image is None:
156        logging.warning("Getting Image failed")
157        raise TelegramError("bad image")
158    
159    image = img_to_bio(image)
160    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
161    
162    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
163
164def pilu(update: Update, context: CallbackContext):
165    
166    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
167    try:
168        tag = " " + context.args[0]
169    except IndexError:
170        tag = ""
171    image, url = get_random_image("rating:e" + tag)
172    if image is None:
173        logging.warning("Getting Image failed")
174        raise TelegramError("bad image")
175        return
176    
177    image = img_to_bio(image)
178    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
179    
180    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
181
182def _format_author(user):
183    
184    if user.username is not None:
185        return user.full_name + f" ({user.username})"
186    return user.full_name
187
188def _get_author(message):
189    
190    if message.forward_from is not None:
191        return _format_author(message.forward_from)
192    
193    if message.forward_sender_name is not None:
194        return message.forward_sender_name
195    
196    if message.forward_from_chat is not None:
197        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
198    
199    return _format_author(message.from_user)
200
201def tt_check(info):
202    
203    reply = info['reply']['text']
204    content = info['content']['text']
205    
206    input_text = f"{reply} {content}".replace("\n", " ")
207    
208    if input_text.strip() == "":
209        return None
210
211    return input_text
212
213def ttbt_check(info):
214    
215    reply = info['reply']['text'].strip()
216    content = info['content']['text'].strip()
217    
218    if len(content.split("\n")) > 1:
219        input_text = content
220    else:
221        input_text = f"{reply}\n{content}"
222    
223    if input_text.strip() == "":
224        return None
225
226    return input_text
227
228def splash_check(info):
229    
230    reply = info['reply']['text']
231    content = info['content']['text']
232    
233    if content.strip() == "":
234        author = info['reply']['author']
235        input_text = f"{author}\n{reply}"
236    else:
237        author = info['content']['author']
238        input_text = f"{author}\n{content}"
239
240    if len(input_text.strip().split("\n")) < 2:
241        return None
242
243    return input_text
244
245def wot_check(info):
246    
247    reply = info['reply']['text']
248    content = info['content']['text']
249    
250    input_text = f"{reply}\n{content}"
251
252    if input_text.strip() == "":
253        return None
254
255    return input_text
256
257def ttbt(update: Update, context: CallbackContext):
258    
259    content, image, markup = _get_all(update, ttbt_check, context)
260    
261    if image is None:
262        update.message.reply_text(l("no_caption", lang))
263        return
264    
265    image = tt_bt_effect(content, image)
266
267    if image is None:
268        update.message.reply_text(l("failed_effect", lang))
269
270    update.message.reply_photo(photo=image, reply_markup=markup)
271
272
273def tt(update: Update, context: CallbackContext):
274    
275    content, image, markup = _get_all(update, tt_check, context)
276    
277    if image is None:
278        update.message.reply_text(l("no_caption", lang))
279        return
280    
281    image = tt_bt_effect(content, image)
282
283    if image is None:
284        update.message.reply_text(l("failed_effect", lang))
285
286    update.message.reply_photo(photo=image, reply_markup=markup)
287    
288def bt(update: Update, context: CallbackContext):
289    
290    content, image, markup = _get_all(update, tt_check, context)
291    
292    if image is None:
293        update.message.reply_text(l("no_caption", lang))
294        return
295
296    image = bt_effect(content, image)
297
298    if image is None:
299        update.message.reply_text(l("failed_effect", lang))
300
301    update.message.reply_photo(photo=image, reply_markup=markup)
302
303def splash(update: Update, context: CallbackContext):
304    
305    content, image, markup = _get_all(update, splash_check, context)
306    
307    if image is None:
308        update.message.reply_text(l("no_caption", lang))
309        return
310    
311    image = splash_effect(content, image)
312
313    if image is None:
314        update.message.reply_text(l("failed_effect", lang))
315    
316    update.message.reply_photo(photo=image, reply_markup=markup)
317    
318def wot(update: Update, context: CallbackContext):
319    
320    content, image, markup = _get_all(update, wot_check, context)
321    
322    if image is None:
323        update.message.reply_text(l("no_caption", lang))
324        return
325    
326    image = wot_effect(content, image)
327
328    if image is None:
329        update.message.reply_text(l("failed_effect", lang))
330    
331    update.message.reply_photo(photo=image, reply_markup=markup)
332    
333def text(update: Update, context: CallbackContext):
334    
335    content, image, markup = _get_all(update, wot_check, context)
336    
337    if image is None:
338        update.message.reply_text(l("no_caption", lang))
339        return
340    
341    image = text_effect(content, image)
342
343    if image is None:
344        update.message.reply_text(l("failed_effect", lang))
345    
346    update.message.reply_photo(photo=image, reply_markup=markup)
347
348def caps(update: Update, context: CallbackContext):
349    
350    _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
351    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
352    
353def unknown(update: Update, context: CallbackContext):
354    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
355    
356def error_callback(update: Update, context: CallbackContext):
357    try:
358        raise context.error
359    except TelegramError as e:
360        logging.error("TelegramError! " + e)
361        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
362        
363def _add_effect_handler(dispatcher, command: str, callback):
364    dispatcher.add_handler(CommandHandler(command, callback))
365    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
366    
367def main():
368    
369    updater = Updater(token=os.getenv("token"),
370                      persistence=PicklePersistence(filename='bot-data.pkl',
371                                                    store_bot_data=False,
372                                                    store_callback_data=False,
373                                                    store_user_data=False))
374    
375    dispatcher = updater.dispatcher
376    dispatcher.add_error_handler(error_callback)
377    
378    # commands
379    dispatcher.add_handler(CommandHandler('start', start))
380    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
381    dispatcher.add_handler(CommandHandler('caps', caps))
382    dispatcher.add_handler(CommandHandler('pic', pic))
383    
384    # effects
385    _add_effect_handler(dispatcher, 'ttbt', ttbt)
386    _add_effect_handler(dispatcher, 'tt', tt)
387    _add_effect_handler(dispatcher, 'bt', bt)
388    _add_effect_handler(dispatcher, 'splash', splash)
389    _add_effect_handler(dispatcher, 'wot', wot)
390    _add_effect_handler(dispatcher, 'text', text)
391    
392    # games
393    dispatcher.add_handler(CommandHandler('spin', spin))
394    dispatcher.add_handler(CallbackQueryHandler(callback=spin))
395    #dispatcher.add_handler(CallbackQueryHandler(callback=autospin))
396    
397    # secrets
398    dispatcher.add_handler(CommandHandler('raw', raw))
399    dispatcher.add_handler(CommandHandler('pilu', pilu))
400    
401    # fallback
402    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
403    updater.start_polling()
404    updater.idle()
405
406if __name__ ==  "__main__":
407    main()