all repos — gif-escarbot @ bd90adc6575ce56127ebd261c9c61e58f6d8c629

Earthbound Café's custom delivery bot with other cool utilities built-in.

Replacer.py (view raw)

 1from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 2from telegram.constants import ParseMode
 3from Config import INLINE_SEP, FEEDBACK_TIMEOUT
 4import logging, re, json
 5from asyncio import sleep
 6logger = logging.getLogger(__name__)
 7
 8re_flags = re.I | re.M
 9
10replacers = [
11    {
12        "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?youtu\.?be(?:\.com)?\/?.*(?:watch|embed)?(?:.*v=|v\/|\/)([\w\-_]+)\&?", re_flags),
13        "becomes": "https://y.outube.duckdns.org/{}",
14    },
15    {
16        "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?twitter\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
17        "becomes": "https://fxtwitter.com/{}/status/{}",
18    },
19    {
20        "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?x\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
21        "becomes": "https://fixupx.com/{}/status/{}",
22    },
23]
24
25link_message = "Da {}[\.]({})"
26
27def get_callback_data(feedback: bool) -> str:
28    payload = { "feedback": feedback }
29    return "feedback" + INLINE_SEP + json.dumps(payload)
30
31def get_message_markup() -> InlineKeyboardMarkup:
32    buttons = [
33        [
34            InlineKeyboardButton(text="", callback_data=get_callback_data(True)),
35            InlineKeyboardButton(text="", callback_data=get_callback_data(False)),
36        ]
37    ]
38    return InlineKeyboardMarkup(buttons)
39
40def format_template(template: str, regex_result) -> str:
41    result_type = type(regex_result)
42    if result_type is str:
43        return template.format(regex_result)
44    elif result_type is tuple or result_type is list:
45        return template.format(*regex_result)
46    elif result_type is dict:
47        return template.format(**regex_result)
48    else:
49        return ""
50
51def parse_text(message: str) -> list:
52    output = []
53    for site in replacers:
54        regex = site["regex"]
55        res = regex.findall(message)
56        for r in res:
57            link = format_template(site["becomes"], r)
58            output.append(link)
59    return output
60
61async def replace(update: Update, _) -> None:
62    try:
63        links = parse_text(update.message.text)
64    except TypeError:
65        links = parse_text(update.message.caption)
66
67    for link in links:
68        logger.info(link)
69        text = link_message.format(update.effective_user.mention_markdown_v2(update.effective_user.name), link)
70        message = await update.effective_chat.send_message(text, parse_mode=ParseMode.MARKDOWN_V2)
71        await sleep(FEEDBACK_TIMEOUT)
72        await message.edit_reply_markup(reply_markup=get_message_markup())
73
74async def feedback(update: Update, _, data_json: str) -> None:
75    data = json.loads(data_json)
76
77    if data["feedback"]:
78        await update.callback_query.answer("Bene!")
79        await update.effective_message.edit_reply_markup()
80        return
81
82    await update.callback_query.answer("Ci ho provato...")
83    await update.effective_message.delete()
84    return