all repos — python-meme-bot @ 23056f54bf43bea42b9437056f1152e31f526381

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import tt_bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l
  5
  6from dotenv import load_dotenv
  7load_dotenv()
  8import os, logging
  9logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 10from io import BytesIO
 11
 12from telegram.error import TelegramError
 13from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
 14from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 15
 16lang = 'us'
 17
 18def _get_args(context):
 19    
 20    logging.info(context.args)
 21    return ' '.join(context.args)
 22
 23def _img_to_bio(image):
 24    
 25    bio = BytesIO()
 26    
 27    if image.mode in ("RGBA", "P"):
 28        image = image.convert("RGB")
 29
 30    image.save(bio, 'JPEG')
 31    bio.seek(0)
 32    return bio
 33
 34def _get_message_content(message):
 35    
 36    image = None
 37    if len(message.photo) > 0:
 38        image = message.photo[-1].get_file()
 39        image = Image.open(BytesIO(image.download_as_bytearray()))
 40    
 41    content = ""
 42    if message.text is not None:
 43        content = message.text.strip()
 44    elif message.caption is not None:
 45        content = message.caption.strip()
 46        
 47    lines = content.split("\n")
 48    r = lines[0].split(" ")
 49    
 50    try:
 51        if r[0][0] == '/':
 52            r.pop(0)
 53    except IndexError:
 54        pass
 55        
 56    lines[0] = " ".join(r)
 57    content = "\n".join(lines)
 58    
 59    return image, content, _get_author(message)
 60
 61def _get_reply(message, fallback=""):
 62    
 63    if message is None:
 64        return None, fallback, None
 65    
 66    image, content, author = _get_message_content(message)
 67    
 68    return image, content, author
 69
 70def _get_lewd(context):
 71    
 72    try:
 73        return context.chat_data["lewd"]
 74    except KeyError:
 75        return False
 76
 77def _get_image(context, tag="", bio=True):
 78    
 79    if context is not None:
 80        if _get_lewd(context):
 81            image, url = get_random_image(rating_lewd, tag)
 82        else:
 83            image, url = get_random_image(rating_normal, tag)
 84    
 85    if image is None:
 86        logging.warning("Getting Image failed")
 87        raise TelegramError("bad image")
 88    
 89    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 90    
 91    if bio:
 92        return _img_to_bio(image), markup
 93    return image, markup
 94
 95def _get_all(update, check_fn, context):
 96    
 97    image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
 98    image_content, text_content, author_content = _get_message_content(update.message)
 99    
100    info_struct = {
101        "reply": {
102            "author": author_reply,
103            "text": text_reply,
104            "image": image_reply
105        },
106        "content": {
107            "author": author_content,
108            "text": text_content,
109            "image": image_content
110        }
111    }
112        
113    logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}")
114    
115    content = check_fn(info_struct)
116    
117    if content is None:
118        return None, None, None
119
120    markup = ""
121    image = None
122    
123    if image_reply is not None:
124        image = image_reply
125    
126    if image_content is not None:
127        image = image_content
128    
129    if image is None:
130        image, markup = _get_image(context, bio=False)
131    
132    return content, image, markup
133
134def start(update: Update, context: CallbackContext):
135    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
136
137def set_lewd(update: Update, context: CallbackContext):
138    
139    try:
140        output = False if context.chat_data["lewd"] else True
141    except KeyError:
142        output = True
143        
144    context.chat_data['lewd'] = output
145    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
146    
147    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
148
149def pic(update: Update, context: CallbackContext):
150    
151    try:
152        tag = " " + context.args[0]
153    except IndexError:
154        tag = ""
155
156    image, markup = _get_image(context, tag)
157    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
158
159def raw(update: Update, context: CallbackContext):
160    
161    tag = ""
162    try:
163        tag += " " + context.args[0]
164        tag += " " + context.args[1]
165    except IndexError:
166        pass
167    
168    image, url = get_random_image("", tag)
169    
170    if image is None:
171        logging.warning("Getting Image failed")
172        raise TelegramError("bad image")
173    
174    image = _img_to_bio(image)
175    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
176    
177    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
178
179def pilu(update: Update, context: CallbackContext):
180    
181    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
182    try:
183        tag = " " + context.args[0]
184    except IndexError:
185        tag = ""
186    image, url = get_random_image("rating:e" + tag)
187    if image is None:
188        logging.warning("Getting Image failed")
189        raise TelegramError("bad image")
190        return
191    
192    image = _img_to_bio(image)
193    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
194    
195    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
196
197def _format_author(user):
198    
199    if user.username is not None:
200        return user.full_name + f" ({user.username})"
201    return user.full_name
202
203def _get_author(message):
204    
205    if message.forward_from is not None:
206        return _format_author(message.forward_from)
207    
208    if message.forward_sender_name is not None:
209        return message.forward_sender_name
210    
211    if message.forward_from_chat is not None:
212        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
213    
214    return _format_author(message.from_user)
215
216def tt_check(info):
217    
218    reply = info['reply']['text']
219    content = info['content']['text']
220    
221    input_text = f"{reply} {content}".replace("\n", " ")
222    
223    if input_text.strip() == "":
224        return None
225
226    return input_text
227
228def ttbt_check(info):
229    
230    reply = info['reply']['text'].strip()
231    content = info['content']['text'].strip()
232    
233    if len(content.split("\n")) > 1:
234        input_text = content
235    else:
236        input_text = f"{reply}\n{content}"
237    
238    if input_text.strip() == "":
239        return None
240
241    return input_text
242
243def splash_check(info):
244    
245    reply = info['reply']['text']
246    content = info['content']['text']
247    
248    if content.strip() == "":
249        author = info['reply']['author']
250        input_text = f"{author}\n{reply}"
251    else:
252        author = info['content']['author']
253        input_text = f"{author}\n{content}"
254
255    if len(input_text.strip().split("\n")) < 2:
256        return None
257
258    return input_text
259
260def wot_check(info):
261    
262    reply = info['reply']['text']
263    content = info['content']['text']
264    
265    input_text = f"{reply}\n{content}"
266
267    if input_text.strip() == "":
268        return None
269
270    return input_text
271
272def ttbt(update: Update, context: CallbackContext):
273    
274    content, image, markup = _get_all(update, ttbt_check, context)
275    
276    if image is None:
277        update.message.reply_text(l("no_caption", lang))
278        return
279    
280    image = _img_to_bio(tt_bt_effect(content, image))
281    update.message.reply_photo(photo=image, reply_markup=markup)
282
283
284def tt(update: Update, context: CallbackContext):
285    
286    content, image, markup = _get_all(update, tt_check, context)
287    
288    if image is None:
289        update.message.reply_text(l("no_caption", lang))
290        return
291    
292    image = _img_to_bio(tt_bt_effect(content, image))
293    update.message.reply_photo(photo=image, reply_markup=markup)
294    
295def bt(update: Update, context: CallbackContext):
296    
297    content, image, markup = _get_all(update, tt_check, context)
298    
299    if image is None:
300        update.message.reply_text(l("no_caption", lang))
301        return
302    
303    image = _img_to_bio(tt_bt_effect("\n" + content, image))
304    update.message.reply_photo(photo=image, reply_markup=markup)
305
306def splash(update: Update, context: CallbackContext):
307    
308    content, image, markup = _get_all(update, splash_check, context)
309    
310    if image is None:
311        update.message.reply_text(l("no_caption", lang))
312        return
313    
314    image = _img_to_bio(splash_effect(content, image))
315    update.message.reply_photo(photo=image, reply_markup=markup)
316    
317def wot(update: Update, context: CallbackContext):
318    
319    content, image, markup = _get_all(update, wot_check, context)
320    
321    if image is None:
322        update.message.reply_text(l("no_caption", lang))
323        return
324    
325    image = _img_to_bio(wot_effect(content, image))
326    update.message.reply_photo(photo=image, reply_markup=markup)
327    
328def text(update: Update, context: CallbackContext):
329    
330    content, image, markup = _get_all(update, wot_check, context)
331    
332    if image is None:
333        update.message.reply_text(l("no_caption", lang))
334        return
335    
336    image = _img_to_bio(text_effect(content, image))
337    update.message.reply_photo(photo=image, reply_markup=markup)
338
339def caps(update: Update, context: CallbackContext):
340    
341    _, reply, _ = _get_reply(update.message.reply_to_message, _get_args(context))
342    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
343    
344def unknown(update: Update, context: CallbackContext):
345    logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
346    
347def error_callback(update: Update, context: CallbackContext):
348    try:
349        raise context.error
350    except TelegramError:
351        logging.error("TelegramError!!")
352        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
353        
354def _add_effect_handler(dispatcher, command: str, callback):
355    dispatcher.add_handler(CommandHandler(command, callback))
356    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
357    
358def main():
359    
360    updater = Updater(token=os.getenv("token"),
361                      persistence=PicklePersistence(filename='bot-data.pkl',
362                                                    store_bot_data=False,
363                                                    store_callback_data=False,
364                                                    store_user_data=False))
365    
366    dispatcher = updater.dispatcher
367    dispatcher.add_error_handler(error_callback)
368    
369    dispatcher.add_handler(CommandHandler('start', start))
370    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
371    dispatcher.add_handler(CommandHandler('caps', caps))
372    dispatcher.add_handler(CommandHandler('pic', pic))
373    dispatcher.add_handler(CommandHandler('raw', raw))
374    dispatcher.add_handler(CommandHandler('pilu', pilu))
375    
376    _add_effect_handler(dispatcher, 'ttbt', ttbt)
377    _add_effect_handler(dispatcher, 'tt', tt)
378    _add_effect_handler(dispatcher, 'bt', bt)
379    _add_effect_handler(dispatcher, 'splash', splash)
380    _add_effect_handler(dispatcher, 'wot', wot)
381    _add_effect_handler(dispatcher, 'text', text)
382    
383    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
384    updater.start_polling()
385    updater.idle()
386
387if __name__ ==  "__main__":
388    main()