all repos — python-meme-bot @ ee18ea77890feb2d14eb445366f8c6c3e07d5b9f

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import tt_bt_effect, splash_effect
  4import Constants as C
  5from Constants import get_localized_string as l
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError, BadRequest
 14from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17lang = 'us'
 18
 19def _get_args(context):
 20    logging.info(context.args)
 21    return ' '.join(context.args)
 22
 23def _img_to_bio(image):
 24    bio = BytesIO()
 25    
 26    if image.mode in ("RGBA", "P"):
 27        image = image.convert("RGB")
 28
 29    image.save(bio, 'JPEG')
 30    bio.seek(0)
 31    return bio
 32
 33def _ttbt_general(context, text, image=None):
 34    if text.strip() == "":
 35        return None, l("no_caption", lang)
 36    
 37    markup = ""
 38    if image is None:
 39        image, markup = _get_image(context, bio=False)
 40    image = _img_to_bio(tt_bt_effect(text, image))
 41    return image, markup
 42
 43def _get_message_content(message):
 44    image = None
 45    if len(message.photo) > 0:
 46        image = message.photo[-1].get_file()
 47        image = Image.open(BytesIO(image.download_as_bytearray()))
 48    
 49    content = ""
 50    if message.text is not None:
 51        content = message.text.strip()
 52    elif message.caption is not None:
 53        content = message.caption.strip()
 54        
 55    lines = content.split("\n")
 56    r = lines[0].split(" ")
 57    
 58    try:
 59        if r[0][0] == '/':
 60            r.pop(0)
 61    except IndexError:
 62        pass
 63        
 64    lines[0] = " ".join(r)
 65    return image, "\n".join(lines)
 66
 67def _get_reply(message, fallback=""):
 68    
 69    if message is None:
 70        return None, fallback
 71    
 72    image, content = _get_message_content(message)
 73    
 74    return image, content
 75
 76def _get_lewd(context):
 77    try:
 78        return context.chat_data["lewd"]
 79    except KeyError:
 80        return False
 81
 82def _get_image(context, tag="", bio=True):
 83    if context is not None:
 84        if _get_lewd(context):
 85            image, url = get_random_image(rating_lewd, tag)
 86        else:
 87            image, url = get_random_image(rating_normal, tag)
 88    
 89    if image is None:
 90        logging.warning("Getting Image failed")
 91        raise TelegramError("bad image")
 92    
 93    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 94    
 95    if bio:
 96        return _img_to_bio(image), markup
 97    return image, markup
 98
 99def _get_all(update, check_fn, context):
100    author = _get_author(update.message)
101    image_reply, reply = _get_reply(update.message.reply_to_message)
102    
103    image_content, content = _get_message_content(update.message)
104    logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}")
105    
106    content = check_fn(author, reply, content)
107    
108    if content is None:
109        return None, None, None
110
111    markup = ""
112    image = None
113    if image_reply is not None:
114        image = image_reply
115        
116    if image_content is not None:
117        image = image_content
118    
119    if image is None:
120        image, markup = _get_image(context, bio=False)
121        
122    return content, image, markup
123
124def start(update: Update, context: CallbackContext):
125    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
126
127def set_lewd(update: Update, context: CallbackContext):
128    try:
129        output = False if context.chat_data["lewd"] else True
130    except KeyError:
131        output = True
132        
133    context.chat_data['lewd'] = output
134    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
135    
136    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
137
138def pic(update: Update, context: CallbackContext):
139    try:
140        tag = " " + context.args[0]
141    except IndexError:
142        tag = ""
143
144    image, markup = _get_image(context, tag)
145    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
146
147def raw(update: Update, context: CallbackContext):
148    tag = ""
149    try:
150        tag += " " + context.args[0]
151        tag += " " + context.args[1]
152    except IndexError:
153        pass
154    
155    image, url = get_random_image("", tag)
156    
157    if image is None:
158        logging.warning("Getting Image failed")
159        raise TelegramError("bad image")
160    
161    image = _img_to_bio(image)
162    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
163    
164    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
165
166def pilu(update: Update, context: CallbackContext):
167    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
168    try:
169        tag = " " + context.args[0]
170    except IndexError:
171        tag = ""
172    image, url = get_random_image("rating:e" + tag)
173    if image is None:
174        logging.warning("Getting Image failed")
175        raise TelegramError("bad image")
176        return
177    
178    image = _img_to_bio(image)
179    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
180    
181    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
182
183def _format_author(user):
184    if user.username is not None:
185        return user.full_name + f" ({user.username})"
186    return user.full_name
187
188def _get_author(message):
189    
190    if message.reply_to_message is None:
191        return _format_author(message.from_user)
192    
193    if message.reply_to_message.forward_from is not None:
194        return _format_author(message.reply_to_message.forward_from)
195    
196    if message.reply_to_message.forward_sender_name is not None:
197        return message.reply_to_message.forward_sender_name
198    
199    return _format_author(message.reply_to_message.from_user)
200
201def tt_check(author, content, reply):
202    input_text = f"{reply} {content}".replace("\n", " ")
203    
204    if input_text.strip() == "":
205        return None
206
207    return input_text
208
209def ttbt_check(author, content, reply):
210    input_text = f"{reply}\n{content}"
211    
212    if input_text.strip() == "":
213        return None
214
215    return input_text
216
217def splash_check(author, content, reply):
218    input_text = f"{author}\n{content}\n{reply}"
219
220    if len(input_text.strip().split("\n")) < 2:
221        return None
222
223    return input_text
224
225def ttbt(update: Update, context: CallbackContext):
226    content, image, markup = _get_all(update, ttbt_check, context)
227    
228    if image is None:
229        update.message.reply_text(l("no_caption", lang))
230        return
231    
232    image = _img_to_bio(tt_bt_effect(content, image))
233    update.message.reply_photo(photo=image, reply_markup=markup)
234
235
236def tt(update: Update, context: CallbackContext):
237    content, image, markup = _get_all(update, tt_check, context)
238    
239    if image is None:
240        update.message.reply_text(l("no_caption", lang))
241        return
242    
243    image = _img_to_bio(tt_bt_effect(content, image))
244    update.message.reply_photo(photo=image, reply_markup=markup)
245    
246def bt(update: Update, context: CallbackContext):
247    content, image, markup = _get_all(update, tt_check, context)
248    
249    if image is None:
250        update.message.reply_text(l("no_caption", lang))
251        return
252    
253    image = _img_to_bio(tt_bt_effect("\n" + content, image))
254    update.message.reply_photo(photo=image, reply_markup=markup)
255
256def splash(update: Update, context: CallbackContext):
257    content, image, markup = _get_all(update, splash_check, context)
258    
259    if image is None:
260        update.message.reply_text(l("no_caption", lang))
261        return
262    
263    image = _img_to_bio(splash_effect(content, image))
264    update.message.reply_photo(photo=image, reply_markup=markup)
265
266def caps(update: Update, context: CallbackContext):
267    _, reply = _get_reply(update.message.reply_to_message, _get_args(context))
268    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
269    
270def unknown(update: Update, context: CallbackContext):
271    logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
272    
273def error_callback(update: Update, context: CallbackContext):
274    try:
275        raise context.error
276    except TelegramError:
277        logging.error("TelegramError!!")
278        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
279        
280def _add_effect_handler(dispatcher, command: str, callback):
281    dispatcher.add_handler(CommandHandler(command, callback))
282    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
283    
284def main():
285    
286    updater = Updater(token=os.getenv("token"),
287                      persistence=PicklePersistence(filename='bot-data.pkl',
288                                                    store_bot_data=False,
289                                                    store_callback_data=False,
290                                                    store_user_data=False))
291    
292    dispatcher = updater.dispatcher
293    dispatcher.add_error_handler(error_callback)
294    
295    dispatcher.add_handler(CommandHandler('start', start))
296    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
297    dispatcher.add_handler(CommandHandler('caps', caps))
298    dispatcher.add_handler(CommandHandler('pic', pic))
299    dispatcher.add_handler(CommandHandler('raw', raw))
300    dispatcher.add_handler(CommandHandler('pilu', pilu))
301    
302    _add_effect_handler(dispatcher, 'ttbt', ttbt)
303    _add_effect_handler(dispatcher, 'tt', tt)
304    _add_effect_handler(dispatcher, 'bt', bt)
305    _add_effect_handler(dispatcher, 'splash', splash)
306    
307    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
308    updater.start_polling()
309    updater.idle()
310
311if __name__ ==  "__main__":
312    main()