all repos — python-meme-bot @ 11288d1572db307eeafd75d3c3e12e11b3143768

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import tt_bt_effect
  4import Constants as C
  5from Constants import get_localized_string as l
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError, BadRequest
 14from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters
 15from telegram import Update
 16
 17lang = 'us'
 18
 19def _get_args(context):
 20    logging.info(context.args)
 21    return ' '.join(context.args)
 22
 23def _img_to_bio(image):
 24    bio = BytesIO()
 25    #bio.name = 'image.jpeg'
 26    
 27    if image.mode in ("RGBA", "P"):
 28        image = image.convert("RGB")
 29
 30    image.save(bio, 'JPEG')
 31    bio.seek(0)
 32    return bio
 33
 34def _ttbt_general(context, text, image=None):
 35    if text.strip() == "":
 36        return None, l("no_caption", lang)
 37    
 38    url = ""
 39    if image is None:
 40        image, url = _get_image(context, bio=False)
 41    image = tt_bt_effect(text, image)
 42    return _img_to_bio(image), url
 43
 44def _get_reply(input, context, fallback=""):
 45
 46    if input is None:
 47        return None, fallback
 48    
 49    if input.photo is not None:
 50        image = input.photo[-1].get_file()
 51        #image = context.bot.get_file(input.photo[-1].file_id)
 52        image = Image.open(BytesIO(image.download_as_bytearray()))
 53    
 54    if input.caption is not None:
 55        return image, input.caption
 56    
 57    if input.text is not None:
 58        return image, input.text
 59    
 60    return fallback
 61
 62def _get_lewd(context):
 63    try:
 64        return context.chat_data["lewd"]
 65    except KeyError:
 66        return False
 67
 68def _get_image(context, bio=True):
 69    if context is not None:
 70        if _get_lewd(context):
 71            image, url = get_random_image(rating_lewd)
 72        else:
 73            image, url = get_random_image(rating_normal)
 74    
 75    if image is None:
 76        logging.warning("Getting Image failed")
 77        raise TelegramError("bad image")
 78    
 79    url = l("sauce", lang).format(url)
 80    if bio:
 81        return _img_to_bio(image), url
 82    return image, url
 83    
 84
 85def start(update: Update, context: CallbackContext):
 86    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
 87
 88def set_lewd(update: Update, context: CallbackContext):
 89    try:
 90        output = False if context.chat_data["lewd"] else True
 91    except KeyError:
 92        output = True
 93        
 94    context.chat_data['lewd'] = output
 95    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
 96    
 97    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
 98
 99def pic(update: Update, context: CallbackContext):
100    image, url = _get_image(context)
101    update.message.reply_photo(photo=image, caption=url, parse_mode="markdown")
102
103def pilu(update: Update, context: CallbackContext):
104    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
105    try:
106        tag = " " + context.args[0]
107    except IndexError:
108        tag = ""
109    image, url = get_random_image("e" + tag)
110    if image is None:
111        logging.warning("Getting Image failed")
112        raise TelegramError("bad image")
113        return
114    url = l("sauce", lang).format(url)
115    image = _img_to_bio(image)
116    update.message.reply_photo(photo=image, caption=url, parse_mode="markdown")
117
118def tt(update: Update, context: CallbackContext):
119    image, reply = _get_reply(update.message.reply_to_message, context)
120    content = _get_args(context)
121    input_text = f"{reply} {content}".replace("\n", " ")
122    
123    image, url = _ttbt_general(context, input_text, image)
124    
125    if image is None:
126        update.message.reply_text(url)
127        return
128    update.message.reply_photo(photo=image, caption=url, parse_mode="markdown")
129
130def bt(update: Update, context: CallbackContext):
131    image, reply = _get_reply(update.message.reply_to_message, context)
132    content = _get_args(context)
133    input_text = f"{reply} {content}".replace("\n", " ")
134    
135    image, url =_ttbt_general(context, " \n" + input_text, image)
136    
137    if image is None:
138        update.message.reply_text(url)
139        return
140    update.message.reply_photo(photo=image, caption=url, parse_mode="markdown")
141
142def ttbt(update: Update, context: CallbackContext):
143    message = update.message
144    
145    image, reply = _get_reply(message.reply_to_message, context)
146    content = message.text[6:] # /ttbt[space]
147    input_text = f"{reply}\n{content}"
148    
149    image, url =_ttbt_general(context, input_text, image)
150    
151    if image is None:
152        message.reply_text(url)
153        return
154    message.reply_photo(photo=image, caption=url, parse_mode="markdown")
155    
156def caps(update: Update, context: CallbackContext):
157    _, reply = _get_reply(update.message.reply_to_message, context, _get_args(context))
158    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
159    
160def unknown(update: Update, context: CallbackContext):
161    logging.info(f"User {update.message.from_user.username} sent {_get_args(context)}")
162    context.bot.reply_text(text=l("unknown", lang))
163    
164def error_callback(update: Update, context: CallbackContext):
165    try:
166        raise context.error
167    #except BadRequest:
168    #    logging.error("BadRequest!!")
169    except TelegramError:
170        logging.error("TelegramError!!")
171        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
172def main():
173    
174    updater = Updater(token=os.getenv("token"))
175    dispatcher = updater.dispatcher
176    dispatcher.add_error_handler(error_callback)
177    
178    dispatcher.add_handler(CommandHandler('start', start))
179    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
180    dispatcher.add_handler(CommandHandler('caps', caps))
181    dispatcher.add_handler(CommandHandler('pic', pic))
182    dispatcher.add_handler(CommandHandler('pilu', pilu))
183    dispatcher.add_handler(CommandHandler('ttbt', ttbt))
184    dispatcher.add_handler(CommandHandler('tt', tt))
185    dispatcher.add_handler(CommandHandler('bt', bt))
186    
187    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
188    updater.start_polling()
189    updater.idle()
190
191if __name__ ==  "__main__":
192    main()