all repos — python-meme-bot @ d274a7d2d84141279017ff4e76fb9f7d47c4f635

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l
  5
  6from dotenv import load_dotenv
  7load_dotenv()
  8import os, logging
  9logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 10from io import BytesIO
 11
 12from telegram.error import TelegramError
 13from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
 14from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 15
 16lang = 'us'
 17
 18def _get_args(context):
 19    
 20    logging.info(context.args)
 21    return ' '.join(context.args)
 22
 23def _get_message_content(message):
 24    
 25    image = None
 26    if len(message.photo) > 0:
 27        image = message.photo[-1].get_file()
 28        image = Image.open(BytesIO(image.download_as_bytearray()))
 29    
 30    content = ""
 31    if message.text is not None:
 32        content = message.text.strip()
 33    elif message.caption is not None:
 34        content = message.caption.strip()
 35        
 36    lines = content.split("\n")
 37    r = lines[0].split(" ")
 38    
 39    try:
 40        if r[0][0] == '/':
 41            r.pop(0)
 42    except IndexError:
 43        pass
 44        
 45    lines[0] = " ".join(r)
 46    content = "\n".join(lines)
 47    
 48    return image, content, _get_author(message)
 49
 50def _get_reply(message, fallback=""):
 51    
 52    if message is None:
 53        return None, fallback, None
 54    
 55    image, content, author = _get_message_content(message)
 56    
 57    return image, content, author
 58
 59def _get_lewd(context):
 60    
 61    try:
 62        return context.chat_data["lewd"]
 63    except KeyError:
 64        return False
 65
 66def _get_image(context, tag="", bio=True):
 67    
 68    if context is not None:
 69        if _get_lewd(context):
 70            image, url = get_random_image(rating_lewd, tag)
 71        else:
 72            image, url = get_random_image(rating_normal, tag)
 73    
 74    if image is None:
 75        logging.warning("Getting Image failed")
 76        raise TelegramError("bad image")
 77    
 78    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 79    
 80    if bio:
 81        return image, markup
 82    return image, markup
 83
 84def _get_all(update, check_fn, context):
 85    
 86    image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
 87    image_content, text_content, author_content = _get_message_content(update.message)
 88    
 89    info_struct = {
 90        "reply": {
 91            "author": author_reply,
 92            "text": text_reply,
 93            "image": image_reply
 94        },
 95        "content": {
 96            "author": author_content,
 97            "text": text_content,
 98            "image": image_content
 99        }
100    }
101        
102    logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
103    
104    content = check_fn(info_struct)
105    
106    if content is None:
107        return None, None, None
108
109    markup = ""
110    image = None
111    
112    if image_reply is not None:
113        image = image_reply
114    
115    if image_content is not None:
116        image = image_content
117    
118    if image is None:
119        image, markup = _get_image(context, bio=False)
120    
121    return content, image, markup
122
123def start(update: Update, context: CallbackContext):
124    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
125
126def set_lewd(update: Update, context: CallbackContext):
127    
128    try:
129        output = False if context.chat_data["lewd"] else True
130    except KeyError:
131        output = True
132        
133    context.chat_data['lewd'] = output
134    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
135    
136    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
137
138def pic(update: Update, context: CallbackContext):
139    
140    try:
141        tag = " " + context.args[0]
142    except IndexError:
143        tag = ""
144
145    image, markup = _get_image(context, tag)
146    update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
147
148def raw(update: Update, context: CallbackContext):
149    
150    tag = ""
151    try:
152        tag += " " + context.args[0]
153        tag += " " + context.args[1]
154    except IndexError:
155        pass
156    
157    image, url = get_random_image("", tag)
158    
159    if image is None:
160        logging.warning("Getting Image failed")
161        raise TelegramError("bad image")
162    
163    image = _img_to_bio(image)
164    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
165    
166    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
167
168def pilu(update: Update, context: CallbackContext):
169    
170    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
171    try:
172        tag = " " + context.args[0]
173    except IndexError:
174        tag = ""
175    image, url = get_random_image("rating:e" + tag)
176    if image is None:
177        logging.warning("Getting Image failed")
178        raise TelegramError("bad image")
179        return
180    
181    image = _img_to_bio(image)
182    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
183    
184    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
185
186def _format_author(user):
187    
188    if user.username is not None:
189        return user.full_name + f" ({user.username})"
190    return user.full_name
191
192def _get_author(message):
193    
194    if message.forward_from is not None:
195        return _format_author(message.forward_from)
196    
197    if message.forward_sender_name is not None:
198        return message.forward_sender_name
199    
200    if message.forward_from_chat is not None:
201        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
202    
203    return _format_author(message.from_user)
204
205def tt_check(info):
206    
207    reply = info['reply']['text']
208    content = info['content']['text']
209    
210    input_text = f"{reply} {content}".replace("\n", " ")
211    
212    if input_text.strip() == "":
213        return None
214
215    return input_text
216
217def ttbt_check(info):
218    
219    reply = info['reply']['text'].strip()
220    content = info['content']['text'].strip()
221    
222    if len(content.split("\n")) > 1:
223        input_text = content
224    else:
225        input_text = f"{reply}\n{content}"
226    
227    if input_text.strip() == "":
228        return None
229
230    return input_text
231
232def splash_check(info):
233    
234    reply = info['reply']['text']
235    content = info['content']['text']
236    
237    if content.strip() == "":
238        author = info['reply']['author']
239        input_text = f"{author}\n{reply}"
240    else:
241        author = info['content']['author']
242        input_text = f"{author}\n{content}"
243
244    if len(input_text.strip().split("\n")) < 2:
245        return None
246
247    return input_text
248
249def wot_check(info):
250    
251    reply = info['reply']['text']
252    content = info['content']['text']
253    
254    input_text = f"{reply}\n{content}"
255
256    if input_text.strip() == "":
257        return None
258
259    return input_text
260
261def ttbt(update: Update, context: CallbackContext):
262    
263    content, image, markup = _get_all(update, ttbt_check, context)
264    
265    if image is None:
266        update.message.reply_text(l("no_caption", lang))
267        return
268    
269    image = tt_bt_effect(content, image)
270
271    if image is None:
272        update.message.reply_text(l("failed_effect", lang))
273
274    update.message.reply_photo(photo=image, reply_markup=markup)
275
276
277def tt(update: Update, context: CallbackContext):
278    
279    content, image, markup = _get_all(update, tt_check, context)
280    
281    if image is None:
282        update.message.reply_text(l("no_caption", lang))
283        return
284    
285    image = tt_bt_effect(content, image)
286
287    if image is None:
288        update.message.reply_text(l("failed_effect", lang))
289
290    update.message.reply_photo(photo=image, reply_markup=markup)
291    
292def bt(update: Update, context: CallbackContext):
293    
294    content, image, markup = _get_all(update, tt_check, context)
295    
296    if image is None:
297        update.message.reply_text(l("no_caption", lang))
298        return
299
300    image = bt_effect(content, image)
301
302    if image is None:
303        update.message.reply_text(l("failed_effect", lang))
304
305    update.message.reply_photo(photo=image, reply_markup=markup)
306
307def splash(update: Update, context: CallbackContext):
308    
309    content, image, markup = _get_all(update, splash_check, context)
310    
311    if image is None:
312        update.message.reply_text(l("no_caption", lang))
313        return
314    
315    image = splash_effect(content, image)
316
317    if image is None:
318        update.message.reply_text(l("failed_effect", lang))
319    
320    update.message.reply_photo(photo=image, reply_markup=markup)
321    
322def wot(update: Update, context: CallbackContext):
323    
324    content, image, markup = _get_all(update, wot_check, context)
325    
326    if image is None:
327        update.message.reply_text(l("no_caption", lang))
328        return
329    
330    image = wot_effect(content, image)
331
332    if image is None:
333        update.message.reply_text(l("failed_effect", lang))
334    
335    update.message.reply_photo(photo=image, reply_markup=markup)
336    
337def text(update: Update, context: CallbackContext):
338    
339    content, image, markup = _get_all(update, wot_check, context)
340    
341    if image is None:
342        update.message.reply_text(l("no_caption", lang))
343        return
344    
345    image = text_effect(content, image)
346
347    if image is None:
348        update.message.reply_text(l("failed_effect", lang))
349    
350    update.message.reply_photo(photo=image, reply_markup=markup)
351
352def caps(update: Update, context: CallbackContext):
353    
354    _, reply, _ = _get_reply(update.message.reply_to_message, _get_args(context))
355    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
356    
357def unknown(update: Update, context: CallbackContext):
358    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
359    
360def error_callback(update: Update, context: CallbackContext):
361    try:
362        raise context.error
363    except TelegramError:
364        logging.error("TelegramError!!")
365        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
366        
367def _add_effect_handler(dispatcher, command: str, callback):
368    dispatcher.add_handler(CommandHandler(command, callback))
369    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
370    
371def main():
372    
373    updater = Updater(token=os.getenv("token"),
374                      persistence=PicklePersistence(filename='bot-data.pkl',
375                                                    store_bot_data=False,
376                                                    store_callback_data=False,
377                                                    store_user_data=False))
378    
379    dispatcher = updater.dispatcher
380    dispatcher.add_error_handler(error_callback)
381    
382    dispatcher.add_handler(CommandHandler('start', start))
383    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
384    dispatcher.add_handler(CommandHandler('caps', caps))
385    dispatcher.add_handler(CommandHandler('pic', pic))
386    dispatcher.add_handler(CommandHandler('raw', raw))
387    dispatcher.add_handler(CommandHandler('pilu', pilu))
388    
389    _add_effect_handler(dispatcher, 'ttbt', ttbt)
390    _add_effect_handler(dispatcher, 'tt', tt)
391    _add_effect_handler(dispatcher, 'bt', bt)
392    _add_effect_handler(dispatcher, 'splash', splash)
393    _add_effect_handler(dispatcher, 'wot', wot)
394    _add_effect_handler(dispatcher, 'text', text)
395    
396    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
397    updater.start_polling()
398    updater.idle()
399
400if __name__ ==  "__main__":
401    main()