all repos — python-meme-bot @ d6840de4407d0edc3d794313bb3a1bde777e745d

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import tt_bt_effect, splash_effect
  4import Constants as C
  5from Constants import get_localized_string as l
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError, BadRequest
 14from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17lang = 'us'
 18
 19def _get_args(context):
 20    logging.info(context.args)
 21    return ' '.join(context.args)
 22
 23def _img_to_bio(image):
 24    bio = BytesIO()
 25    
 26    if image.mode in ("RGBA", "P"):
 27        image = image.convert("RGB")
 28
 29    image.save(bio, 'JPEG')
 30    bio.seek(0)
 31    return bio
 32
 33def _ttbt_general(context, text, image=None):
 34    if text.strip() == "":
 35        return None, l("no_caption", lang)
 36    
 37    markup = ""
 38    if image is None:
 39        image, markup = _get_image(context, bio=False)
 40    image = _img_to_bio(tt_bt_effect(text, image))
 41    return image, markup
 42
 43def _parse_message(text: str):
 44    pass
 45
 46def _get_message_content(message):
 47    image = None
 48    if len(message.photo) > 0:
 49        image = message.photo[-1].get_file()
 50        image = Image.open(BytesIO(image.download_as_bytearray()))
 51    
 52    content = ""
 53    if message.text is not None:
 54        content = message.text.strip()
 55    elif message.caption is not None:
 56        content = message.caption.strip()
 57        
 58    lines = content.split("\n")
 59    r = lines[0].split(" ")
 60    
 61    try:
 62        if r[0][0] == '/':
 63            r.pop(0)
 64    except IndexError:
 65        pass
 66        
 67    lines[0] = " ".join(r)
 68    return image, "\n".join(lines)
 69
 70def _get_reply(message, fallback=""):
 71    
 72    if message is None:
 73        return None, fallback
 74    
 75    image, content = _get_message_content(message)
 76    
 77    return image, content
 78
 79def _get_lewd(context):
 80    try:
 81        return context.chat_data["lewd"]
 82    except KeyError:
 83        return False
 84
 85def _get_image(context, tag="", bio=True):
 86    if context is not None:
 87        if _get_lewd(context):
 88            image, url = get_random_image(rating_lewd, tag)
 89        else:
 90            image, url = get_random_image(rating_normal, tag)
 91    
 92    if image is None:
 93        logging.warning("Getting Image failed")
 94        raise TelegramError("bad image")
 95    
 96    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 97    
 98    if bio:
 99        return _img_to_bio(image), markup
100    return image, markup
101
102def _get_all(update, check_fn, context):
103    author = _get_author(update.message)
104    image_reply, reply = _get_reply(update.message.reply_to_message)
105    
106    image_content, content = _get_message_content(update.message)
107    logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}")
108    
109    content = check_fn(author, reply, content)
110    
111    if content is None:
112        return None, None, None
113
114    markup = ""
115    image = None
116    if image_reply is not None:
117        image = image_reply
118        
119    if image_content is not None:
120        image = image_content
121    
122    if image is None:
123        image, markup = _get_image(context, bio=False)
124        
125    return content, image, markup
126
127def start(update: Update, context: CallbackContext):
128    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
129
130def set_lewd(update: Update, context: CallbackContext):
131    try:
132        output = False if context.chat_data["lewd"] else True
133    except KeyError:
134        output = True
135        
136    context.chat_data['lewd'] = output
137    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
138    
139    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
140
141def pic(update: Update, context: CallbackContext):
142    try:
143        tag = " " + context.args[0]
144    except IndexError:
145        tag = ""
146
147    image, markup = _get_image(context, tag)
148    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
149
150def raw(update: Update, context: CallbackContext):
151    tag = ""
152    try:
153        tag += " " + context.args[0]
154        tag += " " + context.args[1]
155    except IndexError:
156        pass
157    
158    image, url = get_random_image("", tag)
159    
160    if image is None:
161        logging.warning("Getting Image failed")
162        raise TelegramError("bad image")
163    
164    image = _img_to_bio(image)
165    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
166    
167    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
168
169def pilu(update: Update, context: CallbackContext):
170    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
171    try:
172        tag = " " + context.args[0]
173    except IndexError:
174        tag = ""
175    image, url = get_random_image("rating:e" + tag)
176    if image is None:
177        logging.warning("Getting Image failed")
178        raise TelegramError("bad image")
179        return
180    
181    image = _img_to_bio(image)
182    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
183    
184    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
185
186def _format_author(user):
187    if user.username is not None:
188        return user.full_name + f" ({user.username})"
189    return user.full_name
190
191def _get_author(message):
192    
193    if message.reply_to_message is None:
194        return _format_author(message.from_user)
195    
196    if message.reply_to_message.forward_from is not None:
197        return _format_author(message.reply_to_message.forward_from)
198    
199    if message.reply_to_message.forward_sender_name is not None:
200        return message.reply_to_message.forward_sender_name
201    
202    return _format_author(message.reply_to_message.from_user)
203
204def tt_check(author, content, reply):
205    input_text = f"{reply} {content}".replace("\n", " ")
206    
207    if input_text.strip() == "":
208        return None
209
210    return input_text
211
212def ttbt_check(author, content, reply):
213    input_text = f"{reply}\n{content}"
214    
215    if input_text.strip() == "":
216        return None
217
218    return input_text
219
220def splash_check(author, content, reply):
221    input_text = f"{author}\n{content}\n{reply}"
222
223    if len(input_text.strip().split("\n")) < 2:
224        return None
225
226    return input_text
227
228def ttbt(update: Update, context: CallbackContext):
229    content, image, markup = _get_all(update, ttbt_check, context)
230    
231    if image is None:
232        update.message.reply_text(l("no_caption", lang))
233        return
234    
235    image = _img_to_bio(tt_bt_effect(content, image))
236    update.message.reply_photo(photo=image, reply_markup=markup)
237
238
239def tt(update: Update, context: CallbackContext):
240    content, image, markup = _get_all(update, tt_check, context)
241    
242    if image is None:
243        update.message.reply_text(l("no_caption", lang))
244        return
245    
246    image = _img_to_bio(tt_bt_effect(content, image))
247    update.message.reply_photo(photo=image, reply_markup=markup)
248    
249def bt(update: Update, context: CallbackContext):
250    content, image, markup = _get_all(update, tt_check, context)
251    
252    if image is None:
253        update.message.reply_text(l("no_caption", lang))
254        return
255    
256    image = _img_to_bio(tt_bt_effect("\n" + content, image))
257    update.message.reply_photo(photo=image, reply_markup=markup)
258
259def splash(update: Update, context: CallbackContext):
260    content, image, markup = _get_all(update, splash_check, context)
261    
262    if image is None:
263        update.message.reply_text(l("no_caption", lang))
264        return
265    
266    image = _img_to_bio(splash_effect(content, image))
267    update.message.reply_photo(photo=image, reply_markup=markup)
268
269def caps(update: Update, context: CallbackContext):
270    _, reply = _get_reply(update.message.reply_to_message, _get_args(context))
271    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
272    
273def unknown(update: Update, context: CallbackContext):
274    logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
275    
276def error_callback(update: Update, context: CallbackContext):
277    try:
278        raise context.error
279    except TelegramError:
280        logging.error("TelegramError!!")
281        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
282        
283def _add_effect_handler(dispatcher, command: str, callback):
284    dispatcher.add_handler(CommandHandler(command, callback))
285    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
286    
287def main():
288    
289    updater = Updater(token=os.getenv("token"),
290                      persistence=PicklePersistence(filename='bot-data.pkl',
291                                                    store_bot_data=False,
292                                                    store_callback_data=False,
293                                                    store_user_data=False))
294    
295    dispatcher = updater.dispatcher
296    dispatcher.add_error_handler(error_callback)
297    
298    dispatcher.add_handler(CommandHandler('start', start))
299    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
300    dispatcher.add_handler(CommandHandler('caps', caps))
301    dispatcher.add_handler(CommandHandler('pic', pic))
302    dispatcher.add_handler(CommandHandler('raw', raw))
303    dispatcher.add_handler(CommandHandler('pilu', pilu))
304    
305    _add_effect_handler(dispatcher, 'ttbt', ttbt)
306    _add_effect_handler(dispatcher, 'tt', tt)
307    _add_effect_handler(dispatcher, 'bt', bt)
308    _add_effect_handler(dispatcher, 'splash', splash)
309    
310    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
311    updater.start_polling()
312    updater.idle()
313
314if __name__ ==  "__main__":
315    main()