all repos — python-meme-bot @ 527645384b084fc8e88633dd3057b8a05140a4d6

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image, rating_normal, rating_lewd
  3from Effects import tt_bt_effect, splash_effect
  4import Constants as C
  5from Constants import get_localized_string as l
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError, BadRequest
 14from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17lang = 'us'
 18
 19def _get_args(context):
 20    logging.info(context.args)
 21    return ' '.join(context.args)
 22
 23def _img_to_bio(image):
 24    bio = BytesIO()
 25    #bio.name = 'image.jpeg'
 26    
 27    if image.mode in ("RGBA", "P"):
 28        image = image.convert("RGB")
 29
 30    image.save(bio, 'JPEG')
 31    bio.seek(0)
 32    return bio
 33
 34def _ttbt_general(context, text, image=None):
 35    if text.strip() == "":
 36        return None, l("no_caption", lang)
 37    
 38    markup = ""
 39    if image is None:
 40        image, markup = _get_image(context, bio=False)
 41    image = tt_bt_effect(text, image)
 42    return _img_to_bio(image), markup
 43
 44def _get_reply(input, context, fallback=""):
 45
 46    if input is None:
 47        return None, fallback
 48    
 49    image = None
 50    if len(input.photo) > 0:
 51        image = input.photo[-1].get_file()
 52        image = Image.open(BytesIO(image.download_as_bytearray()))
 53    
 54    if input.caption is not None:
 55        return image, input.caption
 56    
 57    if input.text is not None:
 58        return image, input.text
 59    
 60    return image, fallback
 61
 62def _get_lewd(context):
 63    try:
 64        return context.chat_data["lewd"]
 65    except KeyError:
 66        return False
 67
 68def _get_image(context, tag="", bio=True):
 69    if context is not None:
 70        if _get_lewd(context):
 71            image, url = get_random_image(rating_lewd, tag)
 72        else:
 73            image, url = get_random_image(rating_normal, tag)
 74    
 75    if image is None:
 76        logging.warning("Getting Image failed")
 77        raise TelegramError("bad image")
 78    
 79    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
 80    
 81    if bio:
 82        return _img_to_bio(image), markup
 83    return image, markup
 84    
 85
 86def start(update: Update, context: CallbackContext):
 87    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
 88
 89def set_lewd(update: Update, context: CallbackContext):
 90    try:
 91        output = False if context.chat_data["lewd"] else True
 92    except KeyError:
 93        output = True
 94        
 95    context.chat_data['lewd'] = output
 96    message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
 97    
 98    context.bot.send_message(chat_id=update.effective_chat.id, text=message)
 99
100def pic(update: Update, context: CallbackContext):
101    try:
102        tag = " " + context.args[0]
103    except IndexError:
104        tag = ""
105
106    image, markup = _get_image(context, tag)
107    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
108
109def raw(update: Update, context: CallbackContext):
110    tag = ""
111    try:
112        tag += " " + context.args[0]
113        tag += " " + context.args[1]
114    except IndexError:
115        pass
116    
117    image, url = get_random_image("", tag)
118    
119    if image is None:
120        logging.warning("Getting Image failed")
121        raise TelegramError("bad image")
122    
123    image = _img_to_bio(image)
124    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
125    
126    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
127
128def pilu(update: Update, context: CallbackContext):
129    logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
130    try:
131        tag = " " + context.args[0]
132    except IndexError:
133        tag = ""
134    image, url = get_random_image("rating:e" + tag)
135    if image is None:
136        logging.warning("Getting Image failed")
137        raise TelegramError("bad image")
138        return
139    
140    image = _img_to_bio(image)
141    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
142    
143    update.message.reply_photo(photo=image, caption=url, parse_mode="markdown", reply_markup=markup)
144
145def _get_content(message):
146    image = None
147    if len(message.photo) > 0:
148        image = message.photo[-1].get_file()
149        image = Image.open(BytesIO(image.download_as_bytearray()))
150    
151    if message.text is not None:
152        content = message.text.strip()
153    elif message.caption is not None:
154        content = message.caption.strip()
155
156    logging.info(f"User {message.from_user.username} typed: {str(content)}")
157    
158    lines = content.split("\n")
159    r = lines[0].split(" ")
160    r.pop(0)
161    lines[0] = " ".join(r)
162    content = "\n".join(lines)
163    '''
164    lines_new = []
165    for line in lines:
166        words = line.split(" ")
167        lines_new.append(" ".join(words))
168    content = "\n".join(lines_new)
169    '''
170    
171    return image, content
172
173def _format_author(user):
174    if user.username is not None:
175        return user.full_name + f" ({user.username})"
176    return user.full_name
177
178def _get_author(message):
179    
180    if message.reply_to_message is None:
181        return _format_author(message.from_user)
182    
183    if message.reply_to_message.forward_from is not None:
184        return _format_author(message.reply_to_message.forward_from)
185    
186    return _format_author(message.reply_to_message.from_user)
187
188def tt(update: Update, context: CallbackContext):
189    image_reply, reply = _get_reply(update.message.reply_to_message, context)
190    image_content, content = _get_content(update.message)
191    input_text = f"{reply} {content}".replace("\n", " ")
192    
193    image = None
194    if image_reply is not None:
195        image = image_reply
196        
197    if image_content is not None:
198        image = image_content
199    image, markup = _ttbt_general(context, input_text, image)
200    
201    if image is None:
202        update.message.reply_text(markup)
203        return
204    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
205
206def bt(update: Update, context: CallbackContext):
207    image_reply, reply = _get_reply(update.message.reply_to_message, context)
208    image_content, content = _get_content(update.message)
209    input_text = f"{reply} {content}".replace("\n", " ")
210    
211    image = None
212    if image_reply is not None:
213        image = image_reply
214        
215    if image_content is not None:
216        image = image_content
217    image, markup =_ttbt_general(context, " \n" + input_text, image)
218    
219    if image is None:
220        update.message.reply_text(markup)
221        return
222    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
223
224def ttbt(update: Update, context: CallbackContext):
225    image_reply, reply = _get_reply(update.message.reply_to_message, context)
226    image_content, content = _get_content(update.message)
227    
228    input_text = f"{reply}\n{content}"
229    
230    image = None
231    if image_reply is not None:
232        image = image_reply
233        
234    if image_content is not None:
235        image = image_content
236
237    image, markup =_ttbt_general(context, input_text, image)
238    
239    if image is None:
240        update.message.reply_text(markup)
241        return
242    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
243
244def splash(update: Update, context: CallbackContext):
245    author = _get_author(update.message)
246    image_reply, reply = _get_reply(update.message.reply_to_message, context)
247    image_content, content = _get_content(update.message)
248    
249    input_text = f"{author}\n{reply}\n{content}"
250    
251    markup = ""
252    
253    if len(input_text.strip().split("\n")) < 2:
254        markup = l("no_caption", lang)
255        update.message.reply_text(markup)
256        return
257    
258    image = None
259    if image_reply is not None:
260        image = image_reply
261        
262    if image_content is not None:
263        image = image_content
264    
265    if image is None:
266        image, markup = _get_image(context, bio=False)
267    image = _img_to_bio(splash_effect(input_text, image))
268    
269    if image is None:
270        update.message.reply_text(markup)
271        return
272    
273    update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
274
275def caps(update: Update, context: CallbackContext):
276    _, reply = _get_reply(update.message.reply_to_message, context, _get_args(context))
277    context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
278    
279def unknown(update: Update, context: CallbackContext):
280    logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
281    
282def error_callback(update: Update, context: CallbackContext):
283    try:
284        raise context.error
285    #except BadRequest:
286    #    logging.error("BadRequest!!")
287    except TelegramError:
288        logging.error("TelegramError!!")
289        context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
290        
291def _add_effect_handler(dispatcher, command: str, callback):
292    dispatcher.add_handler(CommandHandler(command, callback))
293    
294    dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
295def main():
296    
297    updater = Updater(token=os.getenv("token"))
298    dispatcher = updater.dispatcher
299    dispatcher.add_error_handler(error_callback)
300    
301    dispatcher.add_handler(CommandHandler('start', start))
302    dispatcher.add_handler(CommandHandler('lewd', set_lewd))
303    dispatcher.add_handler(CommandHandler('caps', caps))
304    dispatcher.add_handler(CommandHandler('pic', pic))
305    dispatcher.add_handler(CommandHandler('raw', raw))
306    dispatcher.add_handler(CommandHandler('pilu', pilu))
307    
308    _add_effect_handler(dispatcher, 'ttbt', ttbt)
309    _add_effect_handler(dispatcher, 'tt', tt)
310    _add_effect_handler(dispatcher, 'bt', bt)
311    _add_effect_handler(dispatcher, 'splash', splash)
312    
313    dispatcher.add_handler(MessageHandler(Filters.command, unknown))
314    updater.start_polling()
315    updater.idle()
316
317if __name__ ==  "__main__":
318    main()