all repos — python-meme-bot @ 3fd391b693d6859ad5f0edfd1190cc400364261f

Telegram Bot that uses PIL to compute light image processing.

new features
Marco Andronaco andronacomarco@gmail.com
Mon, 05 Sep 2022 17:42:08 +0200
commit

3fd391b693d6859ad5f0edfd1190cc400364261f

parent

9c164c36cd723d573c0fc185ee2db9771701d8dc

4 files changed, 179 insertions(+), 62 deletions(-)

jump to
M Api.pyApi.py

@@ -1,10 +1,13 @@

-from PIL import Image +from PIL import Image, UnidentifiedImageError from io import BytesIO import requests, random, time -url = "https://danbooru.donmai.us/post/index.json" -test_url = "https://testbooru.donmai.us/post/index.json" +base_url = "https://danbooru.donmai.us/" +base_url_test = "https://testbooru.donmai.us/" + +page_suffix = "post/index.json" +post_suffix = "posts/" ratings = [ 'g', # general

@@ -13,39 +16,53 @@ 'q', # questionable

'e', # explicit ] -params = { - "limit": 100, - "page": random.randint(1,560), - #"page": 560, - "tags": "order:change_desc rating:" -} +rating_normal = "g,s" +rating_lewd = "q" + +limit = 100 +max_pages = 700 +sleep_seconds = 3 +max_tries = 5 -def get_random_image(rating="g,s"): - params["tags"] += rating - - switch = True - while switch: - r = requests.get(url, params) +def get_random_image(rating=rating_normal): + params = { + "limit": limit, + #"tags": "order:change_desc rating:" + rating, + "tags": "rating:" + rating, + } + #url = "https://danbooru.donmai.us/post/index.json" + count = 0 + while count < max_tries: + params['page'] = random.randint(1,max_pages) + r = requests.get(base_url + page_suffix, params) page = r.json() - print("Page: " + str(params['page'])) + #print("Page: " + str(params['page'])) if 'success' in page: if page['success'] == False: print("Error: " + page['error']) print("Message: " + page['message']) - print("Retrying in 3 seconds...\n") else: print(page) - time.sleep(3) else: - n = random.randint(0, 99) - print("File: " + str(n)) - file_url = page[n]['file_url'] - r = requests.get(file_url) + n = random.randint(0, limit - 1) + #print("File: " + str(n)) try: - img = Image.open(BytesIO(r.content)) - except UnidentifiedImageError: - print("Unidentified image.") - - print("Done.\n") - return img, f"https://danbooru.donmai.us/posts/{page[n]['id']}" + file_url = page[n]['file_url'] + r = requests.get(file_url) + try: + img = Image.open(BytesIO(r.content)) + return img, base_url + post_suffix + str(page[n]['id']) + except UnidentifiedImageError: + print("Unidentified image: " + file_url) + except KeyError: + print("Image has no file_url. post: " + base_url + post_suffix + str(page[n]['id'])) + print(str(page[n])) + except IndexError: + print("Page does not exist. " + str(page)) + + count += 1 + print(f"Try #{count} failed. Retrying in {sleep_seconds} seconds...\n") + #time.sleep(sleep_seconds) + print(f"Reached {count} tries. Giving up.") + return None, None
A Constants.py

@@ -0,0 +1,20 @@

+localization = { + 'us': { + 'welcome' : "Welcome to PILuAnimeBot!", + 'sauce' : "[Sauce 🍝]({})", + 'no_caption' : "No caption detected.", + 'lewd_toggle' : "Lewd content was {} for this chat.", + 'enabled' : "enabled", + 'disabled' : "disabled", + 'unknown' : "Sorry, I didn't understand that command.", + 'error': "Something bad happened. Please retry.", + } +} + + +def get_localized_string(text, lang='us'): + try: + return localization[lang][text] + except KeyError: + logging.error("No text was found.") + return "localization error {}{}{}{}{}{}"
M Effects.pyEffects.py

@@ -43,7 +43,7 @@ x += width + LETTER_SPACING

y = y + (txt_height + LINE_SPACING) * factor -def tt_bt(text, img): +def tt_bt_effect(text, img): lines = [x for x in text.split("\n") if x] tt = lines[0] if len(lines) > 0 else None

@@ -71,7 +71,7 @@ def image_test():

image = Image.open("image.jpg") #image, url = get_random_image() - res = tt_bt("top text\nbottom text", image) + res = tt_bt_effect("top text\nbottom text", image) res.save('./output.jpg', optimize=True, quality=80) print("Image test successful")
M main.pymain.py

@@ -1,6 +1,8 @@

from PIL import Image -from Api import get_random_image -from Effects import tt_bt +from Api import get_random_image, rating_normal, rating_lewd +from Effects import tt_bt_effect +import Constants as C +from Constants import get_localized_string as l from dotenv import load_dotenv load_dotenv()

@@ -8,86 +10,164 @@ import os, logging

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) from io import BytesIO +from telegram.error import TelegramError, BadRequest from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters from telegram import Update -sauce_str = "[Sauce 🍝]({})" +lang = 'us' -def _ttbt_general(text): - image, url = get_random_image() - image = tt_bt(text, image) +def _get_args(context): + logging.info(context.args) + return ' '.join(context.args) + +def _img_to_bio(image): bio = BytesIO() - bio.name = 'image.jpeg' + #bio.name = 'image.jpeg' + + if image.mode in ("RGBA", "P"): + image = image.convert("RGB") + image.save(bio, 'JPEG') bio.seek(0) - return bio, sauce_str.format(url) + return bio + +def _ttbt_general(context, text): + if text.strip() == "": + return None, l("no_caption", lang) + + image, url = _get_image(context, bio=False) + image = tt_bt_effect(text, image) + return _img_to_bio(image), url def _get_reply(input, fallback=""): if input is None: return fallback return input.text -def _get_image(): - image, url = get_random_image() - bio = BytesIO() - bio.name = 'image.jpeg' - image.save(bio, 'JPEG') - bio.seek(0) - return bio, sauce_str.format(url) +def _get_lewd(context): + try: + return context.chat_data["lewd"] + except KeyError: + return False + +def _get_image(context, bio=True): + if context is not None: + if _get_lewd(context): + image, url = get_random_image(rating_lewd) + else: + image, url = get_random_image(rating_normal) + + if image is None: + logging.warning("Getting Image failed") + raise TelegramError("bad image") + + url = l("sauce", lang).format(url) + if bio: + return _img_to_bio(image), url + return image, url + def start(update: Update, context: CallbackContext): - context.bot.send_message(chat_id=update.effective_chat.id, text="Welcome to PILuAnimeBot!") + context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang)) -def pic(update: Update, context: CallbackContext): - image, url = _get_image() +def set_lewd(update: Update, context: CallbackContext): + try: + output = False if context.chat_data["lewd"] else True + except KeyError: + output = True + + context.chat_data['lewd'] = output + message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang)) - context.bot.send_photo(chat_id=update.effective_chat.id, photo=image, caption=url, parse_mode="markdown") + context.bot.send_message(chat_id=update.effective_chat.id, text=message) + +def pic(update: Update, context: CallbackContext): + image, url = _get_image(context) + update.message.reply_photo(photo=image, caption=url, parse_mode="markdown") + +def pilu(update: Update, context: CallbackContext): + logging.warning(f"User {update.message.from_user.username} requested an explicit pic.") + try: + tag = " " + context.args[0] + except IndexError: + tag = "" + image, url = get_random_image("e" + tag) + if image is None: + logging.warning("Getting Image failed") + raise TelegramError("bad image") + return + url = l("sauce", lang).format(url) + image = _img_to_bio(image) + update.message.reply_photo(photo=image, caption=url, parse_mode="markdown") def tt(update: Update, context: CallbackContext): reply = _get_reply(update.message.reply_to_message) - content = ' '.join(context.args) + content = _get_args(context) input_text = f"{reply} {content}".replace("\n", " ") - image, url = _ttbt_general(input_text) - context.bot.send_photo(update.effective_chat.id, photo=image, caption=url, parse_mode="markdown") + image, url = _ttbt_general(context, input_text) + + if image is None: + update.message.reply_text(url) + return + update.message.reply_photo(photo=image, caption=url, parse_mode="markdown") def bt(update: Update, context: CallbackContext): reply = _get_reply(update.message.reply_to_message) - content = ' '.join(context.args) + content = _get_args(context) input_text = f"{reply} {content}".replace("\n", " ") - image, url =_ttbt_general(" \n" + input_text) - context.bot.send_photo(update.effective_chat.id, photo=image, caption=url, parse_mode="markdown") + image, url =_ttbt_general(context, " \n" + input_text) + + if image is None: + update.message.reply_text(url) + return + update.message.reply_photo(photo=image, caption=url, parse_mode="markdown") def ttbt(update: Update, context: CallbackContext): reply = _get_reply(update.message.reply_to_message) - content = ' '.join(context.args) + content = _get_args(context) input_text = f"{reply}\n{content}" - image, url =_ttbt_general(input_text) - context.bot.send_photo(update.effective_chat.id, photo=image, caption=url, parse_mode="markdown") + image, url =_ttbt_general(context, input_text) + + if image is None: + update.message.reply_text(url) + return + update.message.reply_photo(photo=image, caption=url, parse_mode="markdown") def caps(update: Update, context: CallbackContext): - reply = _get_reply(update.message.reply_to_message, ' '.join(context.args)) + reply = _get_reply(update.message.reply_to_message, _get_args(context)) context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper()) def unknown(update: Update, context: CallbackContext): - context.bot.send_message(chat_id=update.effective_chat.id, text="Sorry, I didn't understand that command.") - + logging.info(f"User {update.message.from_user.username} sent {_get_args(context)}") + context.bot.reply_text(text=l("unknown", lang)) + +def error_callback(update: Update, context: CallbackContext): + try: + raise context.error + #except BadRequest: + # logging.error("BadRequest!!") + except TelegramError: + logging.error("TelegramError!!") + context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang)) def main(): updater = Updater(token=os.getenv("token")) dispatcher = updater.dispatcher + dispatcher.add_error_handler(error_callback) dispatcher.add_handler(CommandHandler('start', start)) + dispatcher.add_handler(CommandHandler('lewd', set_lewd)) dispatcher.add_handler(CommandHandler('caps', caps)) dispatcher.add_handler(CommandHandler('pic', pic)) + dispatcher.add_handler(CommandHandler('pilu', pilu)) dispatcher.add_handler(CommandHandler('ttbt', ttbt)) dispatcher.add_handler(CommandHandler('tt', tt)) dispatcher.add_handler(CommandHandler('bt', bt)) dispatcher.add_handler(MessageHandler(Filters.command, unknown)) - updater.start_polling() updater.idle()