all repos — python-meme-bot @ d6840de4407d0edc3d794313bb3a1bde777e745d

Telegram Bot that uses PIL to compute light image processing.

code refactoring
Marco Andronaco andronacomarco@gmail.com
Tue, 06 Sep 2022 23:52:19 +0200
commit

d6840de4407d0edc3d794313bb3a1bde777e745d

parent

527645384b084fc8e88633dd3057b8a05140a4d6

6 files changed, 125 insertions(+), 118 deletions(-)

jump to
M .gitignore.gitignore

@@ -5,4 +5,5 @@ __pycache__

.ipynb_checkpoints output.jpg test -test_output+test_output +bot-data.pkl
M Api.pyApi.py

@@ -13,7 +13,7 @@ 'q', # questionable

'e', # explicit ] -rating_normal = "rating:g" +rating_normal = "rating:g,s" rating_lewd = "rating:s,q" supported_file_types = [

@@ -39,7 +39,6 @@ max_tries = 5

params = { "limit": limit, - #"tags": "order:change_desc rating:" + rating, "tags": rating + tags, }
M Constants.pyConstants.py

@@ -8,7 +8,17 @@ 'enabled' : "enabled",

'disabled' : "disabled", 'unknown' : "Sorry, I didn't understand that command.", 'error': "An error has occurred. Please retry.", - } + }, + 'it': { + 'welcome' : "Benvenuto da PILuAnimeBot!", + 'sauce' : "Salsa 🍝", + 'no_caption' : "Scrivi un testo per favore.", + 'lewd_toggle' : "La roba lewd è stata {} per questa chat.", + 'enabled' : "abilitata", + 'disabled' : "disabilitata", + 'unknown' : "Non ho capito.", + 'error': "Qualcosa è andato storto, riprova.", + }, }
M main.pymain.py

@@ -11,7 +11,7 @@ logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)

from io import BytesIO from telegram.error import TelegramError, BadRequest -from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters +from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton lang = 'us'

@@ -22,7 +22,6 @@ return ' '.join(context.args)

def _img_to_bio(image): bio = BytesIO() - #bio.name = 'image.jpeg' if image.mode in ("RGBA", "P"): image = image.convert("RGB")

@@ -38,26 +37,44 @@

markup = "" if image is None: image, markup = _get_image(context, bio=False) - image = tt_bt_effect(text, image) - return _img_to_bio(image), markup + image = _img_to_bio(tt_bt_effect(text, image)) + return image, markup -def _get_reply(input, context, fallback=""): +def _parse_message(text: str): + pass - if input is None: - return None, fallback - +def _get_message_content(message): image = None - if len(input.photo) > 0: - image = input.photo[-1].get_file() + if len(message.photo) > 0: + image = message.photo[-1].get_file() image = Image.open(BytesIO(image.download_as_bytearray())) - if input.caption is not None: - return image, input.caption + content = "" + if message.text is not None: + content = message.text.strip() + elif message.caption is not None: + content = message.caption.strip() + + lines = content.split("\n") + r = lines[0].split(" ") + + try: + if r[0][0] == '/': + r.pop(0) + except IndexError: + pass + + lines[0] = " ".join(r) + return image, "\n".join(lines) + +def _get_reply(message, fallback=""): - if input.text is not None: - return image, input.text + if message is None: + return None, fallback - return image, fallback + image, content = _get_message_content(message) + + return image, content def _get_lewd(context): try:

@@ -81,7 +98,31 @@

if bio: return _img_to_bio(image), markup return image, markup + +def _get_all(update, check_fn, context): + author = _get_author(update.message) + image_reply, reply = _get_reply(update.message.reply_to_message) + image_content, content = _get_message_content(update.message) + logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}") + + content = check_fn(author, reply, content) + + if content is None: + return None, None, None + + markup = "" + image = None + if image_reply is not None: + image = image_reply + + if image_content is not None: + image = image_content + + if image is None: + image, markup = _get_image(context, bio=False) + + return content, image, markup def start(update: Update, context: CallbackContext): context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))

@@ -140,35 +181,7 @@

image = _img_to_bio(image) markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]]) - update.message.reply_photo(photo=image, caption=url, parse_mode="markdown", reply_markup=markup) - -def _get_content(message): - image = None - if len(message.photo) > 0: - image = message.photo[-1].get_file() - image = Image.open(BytesIO(image.download_as_bytearray())) - - if message.text is not None: - content = message.text.strip() - elif message.caption is not None: - content = message.caption.strip() - - logging.info(f"User {message.from_user.username} typed: {str(content)}") - - lines = content.split("\n") - r = lines[0].split(" ") - r.pop(0) - lines[0] = " ".join(r) - content = "\n".join(lines) - ''' - lines_new = [] - for line in lines: - words = line.split(" ") - lines_new.append(" ".join(words)) - content = "\n".join(lines_new) - ''' - - return image, content + update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup) def _format_author(user): if user.username is not None:

@@ -183,97 +196,78 @@

if message.reply_to_message.forward_from is not None: return _format_author(message.reply_to_message.forward_from) + if message.reply_to_message.forward_sender_name is not None: + return message.reply_to_message.forward_sender_name + return _format_author(message.reply_to_message.from_user) -def tt(update: Update, context: CallbackContext): - image_reply, reply = _get_reply(update.message.reply_to_message, context) - image_content, content = _get_content(update.message) +def tt_check(author, content, reply): input_text = f"{reply} {content}".replace("\n", " ") - image = None - if image_reply is not None: - image = image_reply - - if image_content is not None: - image = image_content - image, markup = _ttbt_general(context, input_text, image) + if input_text.strip() == "": + return None + + return input_text + +def ttbt_check(author, content, reply): + input_text = f"{reply}\n{content}" - if image is None: - update.message.reply_text(markup) - return - update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup) + if input_text.strip() == "": + return None -def bt(update: Update, context: CallbackContext): - image_reply, reply = _get_reply(update.message.reply_to_message, context) - image_content, content = _get_content(update.message) - input_text = f"{reply} {content}".replace("\n", " ") - - image = None - if image_reply is not None: - image = image_reply - - if image_content is not None: - image = image_content - image, markup =_ttbt_general(context, " \n" + input_text, image) - - if image is None: - update.message.reply_text(markup) - return - update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup) + return input_text + +def splash_check(author, content, reply): + input_text = f"{author}\n{content}\n{reply}" + + if len(input_text.strip().split("\n")) < 2: + return None + + return input_text def ttbt(update: Update, context: CallbackContext): - image_reply, reply = _get_reply(update.message.reply_to_message, context) - image_content, content = _get_content(update.message) + content, image, markup = _get_all(update, ttbt_check, context) - input_text = f"{reply}\n{content}" + if image is None: + update.message.reply_text(l("no_caption", lang)) + return - image = None - if image_reply is not None: - image = image_reply - - if image_content is not None: - image = image_content + image = _img_to_bio(tt_bt_effect(content, image)) + update.message.reply_photo(photo=image, reply_markup=markup) - image, markup =_ttbt_general(context, input_text, image) + +def tt(update: Update, context: CallbackContext): + content, image, markup = _get_all(update, tt_check, context) if image is None: - update.message.reply_text(markup) + update.message.reply_text(l("no_caption", lang)) return - update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup) - -def splash(update: Update, context: CallbackContext): - author = _get_author(update.message) - image_reply, reply = _get_reply(update.message.reply_to_message, context) - image_content, content = _get_content(update.message) - input_text = f"{author}\n{reply}\n{content}" + image = _img_to_bio(tt_bt_effect(content, image)) + update.message.reply_photo(photo=image, reply_markup=markup) - markup = "" +def bt(update: Update, context: CallbackContext): + content, image, markup = _get_all(update, tt_check, context) - if len(input_text.strip().split("\n")) < 2: - markup = l("no_caption", lang) - update.message.reply_text(markup) + if image is None: + update.message.reply_text(l("no_caption", lang)) return - image = None - if image_reply is not None: - image = image_reply - - if image_content is not None: - image = image_content - - if image is None: - image, markup = _get_image(context, bio=False) - image = _img_to_bio(splash_effect(input_text, image)) + image = _img_to_bio(tt_bt_effect("‎\n" + content, image)) + update.message.reply_photo(photo=image, reply_markup=markup) + +def splash(update: Update, context: CallbackContext): + content, image, markup = _get_all(update, splash_check, context) if image is None: - update.message.reply_text(markup) + update.message.reply_text(l("no_caption", lang)) return - update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup) + image = _img_to_bio(splash_effect(content, image)) + update.message.reply_photo(photo=image, reply_markup=markup) def caps(update: Update, context: CallbackContext): - _, reply = _get_reply(update.message.reply_to_message, context, _get_args(context)) + _, reply = _get_reply(update.message.reply_to_message, _get_args(context)) context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper()) def unknown(update: Update, context: CallbackContext):

@@ -282,19 +276,22 @@

def error_callback(update: Update, context: CallbackContext): try: raise context.error - #except BadRequest: - # logging.error("BadRequest!!") except TelegramError: logging.error("TelegramError!!") context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang)) def _add_effect_handler(dispatcher, command: str, callback): dispatcher.add_handler(CommandHandler(command, callback)) - dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback)) + def main(): - updater = Updater(token=os.getenv("token")) + updater = Updater(token=os.getenv("token"), + persistence=PicklePersistence(filename='bot-data.pkl', + store_bot_data=False, + store_callback_data=False, + store_user_data=False)) + dispatcher = updater.dispatcher dispatcher.add_error_handler(error_callback)