all repos — python-meme-bot @ 24f4cc514750c02498234df67fddb9b259c180ad

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
  5from Slot import spin, autospin, bet, cash
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError
 14from telegram.ext import Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, Filters, PicklePersistence
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17def _get_message_content(message):
 18    
 19    image = None
 20    if len(message.photo) > 0:
 21        image = message.photo[-1].get_file()
 22        image = Image.open(BytesIO(image.download_as_bytearray()))
 23    
 24    content = ""
 25    if message.text is not None:
 26        content = message.text.strip()
 27    elif message.caption is not None:
 28        content = message.caption.strip()
 29        
 30    lines = content.split("\n")
 31    r = lines[0].split(" ")
 32    
 33    try:
 34        if r[0][0] == '/':
 35            r.pop(0)
 36    except IndexError:
 37        pass
 38        
 39    lines[0] = " ".join(r)
 40    content = "\n".join(lines)
 41    
 42    return image, content, _get_author(message)
 43
 44def _get_reply(message, fallback=""):
 45    
 46    if message is None:
 47        return None, fallback, None
 48    
 49    image, content, author = _get_message_content(message)
 50    
 51    return image, content, author
 52
 53def _get_lewd(context):
 54    
 55    try:
 56        return context.chat_data["lewd"]
 57    except KeyError:
 58        return False
 59
 60def _get_image(context, tag="", bio=True):
 61    
 62    if context is not None:
 63        if _get_lewd(context):
 64            image, url = get_random_image(rating_lewd, tag)
 65        else:
 66            image, url = get_random_image(rating_normal, tag)
 67    
 68    if image is None:
 69        logging.warning("Getting Image failed")
 70        raise TelegramError("bad image")
 71    
 72    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
 73    
 74    if bio:
 75        return image, markup
 76    return image, markup
 77
 78def _get_all(update, check_fn, context):
 79    
 80    image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
 81    image_content, text_content, author_content = _get_message_content(update.message)
 82    
 83    info_struct = {
 84        "reply": {
 85            "author": author_reply,
 86            "text": text_reply,
 87            "image": image_reply
 88        },
 89        "content": {
 90            "author": author_content,
 91            "text": text_content,
 92            "image": image_content
 93        }
 94    }
 95        
 96    logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
 97    
 98    content = check_fn(info_struct)
 99    
100    if content is None:
101        return None, None, None
102
103    markup = ""
104    image = None
105    
106    if image_reply is not None:
107        image = image_reply
108    
109    if image_content is not None:
110        image = image_content
111    
112    if image is None:
113        image, markup = _get_image(context, bio=False)
114    
115    return content, image, markup
116
117def start(update: Update, context: CallbackContext):
118    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
119
120def set_lewd(update: Update, context: CallbackContext):
121    
122    try:
123        output = False if context.chat_data["lewd"] else True
124    except KeyError:
125        output = True
126        
127    context.chat_data['lewd'] = output
128    message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
129    
130    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
131
132def pic(update: Update, context: CallbackContext):
133    
134    try:
135        tag = " " + context.args[0]
136    except IndexError:
137        tag = ""
138
139    image, markup = _get_image(context, tag)
140    update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
141
142def raw(update: Update, context: CallbackContext):
143    
144    tag = ""
145    try:
146        tag += " " + context.args[0]
147        tag += " " + context.args[1]
148    except IndexError:
149        pass
150    
151    image, url = get_random_image("", tag)
152    
153    if image is None:
154        logging.warning("Getting Image failed")
155        raise TelegramError("bad image")
156    
157    image = img_to_bio(image)
158    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
159    
160    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
161
162def pilu(update: Update, context: CallbackContext):
163    
164    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
165    try:
166        tag = " " + context.args[0]
167    except IndexError:
168        tag = ""
169    image, url = get_random_image("rating:e" + tag)
170    if image is None:
171        logging.warning("Getting Image failed")
172        raise TelegramError("bad image")
173        return
174    
175    image = img_to_bio(image)
176    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
177    
178    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
179
180def _get_author(message):
181    
182    if message.forward_from is not None:
183        return format_author(message.forward_from)
184    
185    if message.forward_sender_name is not None:
186        return message.forward_sender_name
187    
188    if message.forward_from_chat is not None:
189        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
190    
191    return format_author(message.from_user)
192
193def tt_check(info):
194    
195    reply = info['reply']['text']
196    content = info['content']['text']
197    
198    input_text = f"{reply} {content}".replace("\n", " ")
199    
200    if input_text.strip() == "":
201        return None
202
203    return input_text
204
205def ttbt_check(info):
206    
207    reply = info['reply']['text'].strip()
208    content = info['content']['text'].strip()
209    
210    if len(content.split("\n")) > 1:
211        input_text = content
212    else:
213        input_text = f"{reply}\n{content}"
214    
215    if input_text.strip() == "":
216        return None
217
218    return input_text
219
220def splash_check(info):
221    
222    reply = info['reply']['text']
223    content = info['content']['text']
224    
225    if content.strip() == "":
226        author = info['reply']['author']
227        input_text = f"{author}\n{reply}"
228    else:
229        author = info['content']['author']
230        input_text = f"{author}\n{content}"
231
232    if len(input_text.strip().split("\n")) < 2:
233        return None
234
235    return input_text
236
237def wot_check(info):
238    
239    reply = info['reply']['text']
240    content = info['content']['text']
241    
242    input_text = f"{reply}\n{content}"
243
244    if input_text.strip() == "":
245        return None
246
247    return input_text
248
249def ttbt(update: Update, context: CallbackContext):
250    
251    content, image, markup = _get_all(update, ttbt_check, context)
252    
253    if image is None:
254        update.message.reply_text(l("no_caption", context))
255        return
256    
257    image = tt_bt_effect(content, image)
258
259    if image is None:
260        update.message.reply_text(l("failed_effect", context))
261
262    update.message.reply_photo(photo=image, reply_markup=markup)
263
264
265def tt(update: Update, context: CallbackContext):
266    
267    content, image, markup = _get_all(update, tt_check, context)
268    
269    if image is None:
270        update.message.reply_text(l("no_caption", context))
271        return
272    
273    image = tt_bt_effect(content, image)
274
275    if image is None:
276        update.message.reply_text(l("failed_effect", context))
277
278    update.message.reply_photo(photo=image, reply_markup=markup)
279    
280def bt(update: Update, context: CallbackContext):
281    
282    content, image, markup = _get_all(update, tt_check, context)
283    
284    if image is None:
285        update.message.reply_text(l("no_caption", context))
286        return
287
288    image = bt_effect(content, image)
289
290    if image is None:
291        update.message.reply_text(l("failed_effect", context))
292
293    update.message.reply_photo(photo=image, reply_markup=markup)
294
295def splash(update: Update, context: CallbackContext):
296    
297    content, image, markup = _get_all(update, splash_check, context)
298    
299    if image is None:
300        update.message.reply_text(l("no_caption", context))
301        return
302    
303    image = splash_effect(content, image)
304
305    if image is None:
306        update.message.reply_text(l("failed_effect", context))
307    
308    update.message.reply_photo(photo=image, reply_markup=markup)
309    
310def wot(update: Update, context: CallbackContext):
311    
312    content, image, markup = _get_all(update, wot_check, context)
313    
314    if image is None:
315        update.message.reply_text(l("no_caption", context))
316        return
317    
318    image = wot_effect(content, image)
319
320    if image is None:
321        update.message.reply_text(l("failed_effect", context))
322    
323    update.message.reply_photo(photo=image, reply_markup=markup)
324    
325def text(update: Update, context: CallbackContext):
326    
327    content, image, markup = _get_all(update, wot_check, context)
328    
329    if image is None:
330        update.message.reply_text(l("no_caption", context))
331        return
332    
333    image = text_effect(content, image)
334
335    if image is None:
336        update.message.reply_text(l("failed_effect", context))
337    
338    update.message.reply_photo(photo=image, reply_markup=markup)
339
340def caps(update: Update, context: CallbackContext):
341    
342    _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
343    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
344
345def _set_lang(update: Update, context: CallbackContext, lang: str):
346    context.chat_data["lang"] = lang
347    context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang)))
348
349def lang(update: Update, context: CallbackContext):
350    try:
351        selected = str(context.args[0])
352    except IndexError:
353        selected = None
354    
355    if selected is None:
356        lang = format_lang(get_lang(context))
357        choices = ", ".join(langs) + "."
358        return update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup)
359    
360    if selected not in langs:
361        update.message.reply_text(text=l("invalid_language", context))
362        return
363    
364    _set_lang(update, context, selected)
365    
366def unknown(update: Update, context: CallbackContext):
367    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
368    
369def error_callback(update: Update, context: CallbackContext):
370    try:
371        raise context.error
372    except TelegramError as e:
373        logging.error("TelegramError! " + str(e))
374        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
375        
376def _add_effect_handler(dispatcher, command: str, callback):
377    dispatcher.add_handler(CommandHandler(command, callback))
378    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
379
380def keyboard_handler(update: Update, context: CallbackContext):
381    query = update.callback_query
382    data = query.data
383    
384    if data.startswith("reroll"):
385        amount = int(data.split(" ")[1])
386        
387        if amount == 1:
388            return spin(update, context)
389        return autospin(context, update.effective_chat.id, amount)
390    
391    match query.data:
392        case "none":
393            return query.answer(l("none_callback", context))
394        case "set_lang_en":
395            lang = "en"
396            _set_lang(update, context, lang)
397            return query.answer(l("language_set", context).format(format_lang(lang)))
398        case "set_lang_it":
399            lang = "it"
400            _set_lang(update, context, lang)
401            return query.answer(l("language_set", context).format(format_lang(lang)))
402        case other:
403            logging.error(f"unknown callback: {query.data}")
404    
405    return query.answer()
406
407def main():
408    
409    updater = Updater(token=os.getenv("token"),
410                      persistence=PicklePersistence(filename='bot-data.pkl',
411                                                    store_bot_data=False,
412                                                    store_callback_data=False,
413                                                    #store_user_data=False
414                                                    ))
415    
416    dispatcher = updater.dispatcher
417    #dispatcher.workers = 8
418    
419    dispatcher.add_error_handler(error_callback)
420    dispatcher.add_handler(CallbackQueryHandler(callback=keyboard_handler))
421    
422    # commands
423    dispatcher.add_handler(CommandHandler('start', start))
424    dispatcher.add_handler(CommandHandler('lang', lang))
425    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
426    dispatcher.add_handler(CommandHandler('caps', caps))
427    dispatcher.add_handler(CommandHandler('pic', pic))
428    
429    # effects
430    _add_effect_handler(dispatcher, 'ttbt', ttbt)
431    _add_effect_handler(dispatcher, 'tt', tt)
432    _add_effect_handler(dispatcher, 'bt', bt)
433    _add_effect_handler(dispatcher, 'splash', splash)
434    _add_effect_handler(dispatcher, 'wot', wot)
435    _add_effect_handler(dispatcher, 'text', text)
436    
437    # games
438    dispatcher.add_handler(CommandHandler('spin', spin))
439    dispatcher.add_handler(CommandHandler('bet', bet))
440    dispatcher.add_handler(CommandHandler('cash', cash))
441    
442    # secrets
443    dispatcher.add_handler(CommandHandler('raw', raw))
444    dispatcher.add_handler(CommandHandler('pilu', pilu))
445    
446    # fallback
447    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
448    updater.start_polling()
449    updater.idle()
450
451if __name__ ==  "__main__":
452    main()