all repos — python-meme-bot @ d7425d4fa8d0c7e35a45f0137f3d3ea9680c9b5f

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l, format_author
  5from Games import spin, bet, cash
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError
 14from telegram.ext import Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, Filters, PicklePersistence
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17lang = 'us'
 18
 19def _get_message_content(message):
 20    
 21    image = None
 22    if len(message.photo) > 0:
 23        image = message.photo[-1].get_file()
 24        image = Image.open(BytesIO(image.download_as_bytearray()))
 25    
 26    content = ""
 27    if message.text is not None:
 28        content = message.text.strip()
 29    elif message.caption is not None:
 30        content = message.caption.strip()
 31        
 32    lines = content.split("\n")
 33    r = lines[0].split(" ")
 34    
 35    try:
 36        if r[0][0] == '/':
 37            r.pop(0)
 38    except IndexError:
 39        pass
 40        
 41    lines[0] = " ".join(r)
 42    content = "\n".join(lines)
 43    
 44    return image, content, _get_author(message)
 45
 46def _get_reply(message, fallback=""):
 47    
 48    if message is None:
 49        return None, fallback, None
 50    
 51    image, content, author = _get_message_content(message)
 52    
 53    return image, content, author
 54
 55def _get_lewd(context):
 56    
 57    try:
 58        return context.chat_data["lewd"]
 59    except KeyError:
 60        return False
 61
 62def _get_image(context, tag="", bio=True):
 63    
 64    if context is not None:
 65        if _get_lewd(context):
 66            image, url = get_random_image(rating_lewd, tag)
 67        else:
 68            image, url = get_random_image(rating_normal, tag)
 69    
 70    if image is None:
 71        logging.warning("Getting Image failed")
 72        raise TelegramError("bad image")
 73    
 74    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 75    
 76    if bio:
 77        return image, markup
 78    return image, markup
 79
 80def _get_all(update, check_fn, context):
 81    
 82    image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
 83    image_content, text_content, author_content = _get_message_content(update.message)
 84    
 85    info_struct = {
 86        "reply": {
 87            "author": author_reply,
 88            "text": text_reply,
 89            "image": image_reply
 90        },
 91        "content": {
 92            "author": author_content,
 93            "text": text_content,
 94            "image": image_content
 95        }
 96    }
 97        
 98    logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
 99    
100    content = check_fn(info_struct)
101    
102    if content is None:
103        return None, None, None
104
105    markup = ""
106    image = None
107    
108    if image_reply is not None:
109        image = image_reply
110    
111    if image_content is not None:
112        image = image_content
113    
114    if image is None:
115        image, markup = _get_image(context, bio=False)
116    
117    return content, image, markup
118
119def start(update: Update, context: CallbackContext):
120    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
121
122def set_lewd(update: Update, context: CallbackContext):
123    
124    try:
125        output = False if context.chat_data["lewd"] else True
126    except KeyError:
127        output = True
128        
129    context.chat_data['lewd'] = output
130    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
131    
132    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
133
134def pic(update: Update, context: CallbackContext):
135    
136    try:
137        tag = " " + context.args[0]
138    except IndexError:
139        tag = ""
140
141    image, markup = _get_image(context, tag)
142    update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
143
144def raw(update: Update, context: CallbackContext):
145    
146    tag = ""
147    try:
148        tag += " " + context.args[0]
149        tag += " " + context.args[1]
150    except IndexError:
151        pass
152    
153    image, url = get_random_image("", tag)
154    
155    if image is None:
156        logging.warning("Getting Image failed")
157        raise TelegramError("bad image")
158    
159    image = img_to_bio(image)
160    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
161    
162    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
163
164def pilu(update: Update, context: CallbackContext):
165    
166    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
167    try:
168        tag = " " + context.args[0]
169    except IndexError:
170        tag = ""
171    image, url = get_random_image("rating:e" + tag)
172    if image is None:
173        logging.warning("Getting Image failed")
174        raise TelegramError("bad image")
175        return
176    
177    image = img_to_bio(image)
178    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
179    
180    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
181
182def _get_author(message):
183    
184    if message.forward_from is not None:
185        return format_author(message.forward_from)
186    
187    if message.forward_sender_name is not None:
188        return message.forward_sender_name
189    
190    if message.forward_from_chat is not None:
191        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
192    
193    return format_author(message.from_user)
194
195def tt_check(info):
196    
197    reply = info['reply']['text']
198    content = info['content']['text']
199    
200    input_text = f"{reply} {content}".replace("\n", " ")
201    
202    if input_text.strip() == "":
203        return None
204
205    return input_text
206
207def ttbt_check(info):
208    
209    reply = info['reply']['text'].strip()
210    content = info['content']['text'].strip()
211    
212    if len(content.split("\n")) > 1:
213        input_text = content
214    else:
215        input_text = f"{reply}\n{content}"
216    
217    if input_text.strip() == "":
218        return None
219
220    return input_text
221
222def splash_check(info):
223    
224    reply = info['reply']['text']
225    content = info['content']['text']
226    
227    if content.strip() == "":
228        author = info['reply']['author']
229        input_text = f"{author}\n{reply}"
230    else:
231        author = info['content']['author']
232        input_text = f"{author}\n{content}"
233
234    if len(input_text.strip().split("\n")) < 2:
235        return None
236
237    return input_text
238
239def wot_check(info):
240    
241    reply = info['reply']['text']
242    content = info['content']['text']
243    
244    input_text = f"{reply}\n{content}"
245
246    if input_text.strip() == "":
247        return None
248
249    return input_text
250
251def ttbt(update: Update, context: CallbackContext):
252    
253    content, image, markup = _get_all(update, ttbt_check, context)
254    
255    if image is None:
256        update.message.reply_text(l("no_caption", lang))
257        return
258    
259    image = tt_bt_effect(content, image)
260
261    if image is None:
262        update.message.reply_text(l("failed_effect", lang))
263
264    update.message.reply_photo(photo=image, reply_markup=markup)
265
266
267def tt(update: Update, context: CallbackContext):
268    
269    content, image, markup = _get_all(update, tt_check, context)
270    
271    if image is None:
272        update.message.reply_text(l("no_caption", lang))
273        return
274    
275    image = tt_bt_effect(content, image)
276
277    if image is None:
278        update.message.reply_text(l("failed_effect", lang))
279
280    update.message.reply_photo(photo=image, reply_markup=markup)
281    
282def bt(update: Update, context: CallbackContext):
283    
284    content, image, markup = _get_all(update, tt_check, context)
285    
286    if image is None:
287        update.message.reply_text(l("no_caption", lang))
288        return
289
290    image = bt_effect(content, image)
291
292    if image is None:
293        update.message.reply_text(l("failed_effect", lang))
294
295    update.message.reply_photo(photo=image, reply_markup=markup)
296
297def splash(update: Update, context: CallbackContext):
298    
299    content, image, markup = _get_all(update, splash_check, context)
300    
301    if image is None:
302        update.message.reply_text(l("no_caption", lang))
303        return
304    
305    image = splash_effect(content, image)
306
307    if image is None:
308        update.message.reply_text(l("failed_effect", lang))
309    
310    update.message.reply_photo(photo=image, reply_markup=markup)
311    
312def wot(update: Update, context: CallbackContext):
313    
314    content, image, markup = _get_all(update, wot_check, context)
315    
316    if image is None:
317        update.message.reply_text(l("no_caption", lang))
318        return
319    
320    image = wot_effect(content, image)
321
322    if image is None:
323        update.message.reply_text(l("failed_effect", lang))
324    
325    update.message.reply_photo(photo=image, reply_markup=markup)
326    
327def text(update: Update, context: CallbackContext):
328    
329    content, image, markup = _get_all(update, wot_check, context)
330    
331    if image is None:
332        update.message.reply_text(l("no_caption", lang))
333        return
334    
335    image = text_effect(content, image)
336
337    if image is None:
338        update.message.reply_text(l("failed_effect", lang))
339    
340    update.message.reply_photo(photo=image, reply_markup=markup)
341
342def caps(update: Update, context: CallbackContext):
343    
344    _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
345    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
346    
347def unknown(update: Update, context: CallbackContext):
348    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
349    
350def error_callback(update: Update, context: CallbackContext):
351    try:
352        raise context.error
353    except TelegramError as e:
354        logging.error("TelegramError! " + str(e))
355        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
356        
357def _add_effect_handler(dispatcher, command: str, callback):
358    dispatcher.add_handler(CommandHandler(command, callback))
359    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
360    
361def main():
362    
363    updater = Updater(token=os.getenv("token"),
364                      persistence=PicklePersistence(filename='bot-data.pkl',
365                                                    store_bot_data=False,
366                                                    store_callback_data=False,
367                                                    store_user_data=False))
368    
369    dispatcher = updater.dispatcher
370    dispatcher.add_error_handler(error_callback)
371    
372    # commands
373    dispatcher.add_handler(CommandHandler('start', start))
374    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
375    dispatcher.add_handler(CommandHandler('caps', caps))
376    dispatcher.add_handler(CommandHandler('pic', pic))
377    
378    # effects
379    _add_effect_handler(dispatcher, 'ttbt', ttbt)
380    _add_effect_handler(dispatcher, 'tt', tt)
381    _add_effect_handler(dispatcher, 'bt', bt)
382    _add_effect_handler(dispatcher, 'splash', splash)
383    _add_effect_handler(dispatcher, 'wot', wot)
384    _add_effect_handler(dispatcher, 'text', text)
385    
386    # games
387    dispatcher.add_handler(CommandHandler('spin', spin))
388    dispatcher.add_handler(CommandHandler('bet', bet))
389    dispatcher.add_handler(CommandHandler('cash', cash))
390    dispatcher.add_handler(CallbackQueryHandler(callback=spin))
391    #dispatcher.add_handler(CallbackQueryHandler(callback=autospin))
392    
393    # secrets
394    dispatcher.add_handler(CommandHandler('raw', raw))
395    dispatcher.add_handler(CommandHandler('pilu', pilu))
396    
397    # fallback
398    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
399    updater.start_polling()
400    updater.idle()
401
402if __name__ ==  "__main__":
403    main()