all repos — gif-escarbot @ ba3f25225482fd01f8b05304c59c92fc6748585a

Earthbound Café's custom delivery bot with other cool utilities built-in.

escarbot/replace.py (view raw)

  1from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
  2from telegram.constants import ParseMode
  3from escarbot.config import INLINE_SEP, FEEDBACK_TIMEOUT
  4import logging, re, json
  5from asyncio import sleep
  6logger = logging.getLogger(__name__)
  7
  8re_flags = re.I | re.M
  9
 10def get_human_readable(input_str: str, indicator: str, offset: int = 0) -> (int, int):
 11    try:
 12        result = int(input_str[offset:].split(indicator, maxsplit=1)[0])
 13        return result, len(str(result)) + len(indicator)
 14    except ValueError:
 15        return 0, 0
 16
 17youtube_timestamp_regex = re.compile(r"(?:&|\?)t=(\d*h?\d*m?\d*s?)", re_flags)
 18def youtube_timestamp(input_str: str) -> str:
 19    result = youtube_timestamp_regex.findall(input_str)
 20    try:
 21        input_string = result[0]
 22        seconds = int(input_string)
 23        hours, remainder = divmod(seconds, 3600)
 24        minutes, seconds = divmod(remainder, 60)
 25    except IndexError: # nothing to parse
 26        return ""
 27    except ValueError: # human-readable number
 28        hours, offset = get_human_readable(input_string, "h")
 29        minutes, offset = get_human_readable(input_string, "m", offset)
 30        seconds, offset = get_human_readable(input_string, "s", offset)
 31
 32    if hours == 0:
 33        return '{}:{:02}'.format(minutes, seconds)
 34    return '{}:{:02}:{:02}'.format(hours, minutes, seconds)
 35
 36replacers = [
 37    {
 38        "regex": re.compile(r"(?:(?:https?:)?\/\/)?(?:(?:www|m)\.)?(?:(?:youtube(?:-nocookie)?\.com|youtu.be))(?:\/(?:[\w\-]+\?v=|embed\/|live\/|v\/)?)([\w\-]+)(\S+)?", re_flags),
 39        "becomes": "https://y.outube.duckdns.org/{}",
 40        "timestamp": youtube_timestamp
 41    },
 42    {
 43        "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?twitter\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
 44        "becomes": "https://fxtwitter.com/{}/status/{}",
 45    },
 46    {
 47        "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?x\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
 48        "becomes": "https://fixupx.com/{}/status/{}",
 49    },
 50    {
 51        "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?instagram\.com\/((?:reel)|p)\/([A-Za-z0-9_]{11})[\/\?\w=&]*", re_flags),
 52        "becomes": "https://ddinstagram.com/{}/{}",
 53    },
 54    {
 55        "regex": re.compile(r"(?:https?:\/\/)?(?:(?:www)|(?:vm))?\.?tiktok\.com\/(@[\w]+)\/?(?:video)?\/?(\d+)?", re_flags),
 56        "becomes": "https://tnktok.com/{}/{}",
 57    },
 58    {
 59        "regex": re.compile(r"(?:https?:\/\/)?(?:(?:www)|(?:vm))?\.?tiktok\.com\/([\w]+)\/?", re_flags),
 60        "becomes": "https://vm.tnktok.com/{}/",
 61    },
 62]
 63
 64link_message = "Da {}[\.]({}) {}"
 65
 66def get_callback_data(feedback: bool) -> str:
 67    payload = { "feedback": feedback }
 68    return "feedback" + INLINE_SEP + json.dumps(payload)
 69
 70def get_message_markup() -> InlineKeyboardMarkup:
 71    buttons = [
 72        [
 73            InlineKeyboardButton(text="", callback_data=get_callback_data(True)),
 74            InlineKeyboardButton(text="", callback_data=get_callback_data(False)),
 75        ]
 76    ]
 77    return InlineKeyboardMarkup(buttons)
 78
 79def format_template(template: str, regex_result) -> str:
 80    result_type = type(regex_result)
 81    if result_type is str:
 82        return template.format(regex_result)
 83    elif result_type is tuple or result_type is list:
 84        return template.format(*regex_result)
 85    elif result_type is dict:
 86        return template.format(**regex_result)
 87    else:
 88        return ""
 89
 90def parse_text(message: str) -> list:
 91    output = []
 92    for site in replacers:
 93        regex = site["regex"]
 94        res = regex.findall(message)
 95        for r in res:
 96            link = format_template(site["becomes"], r)
 97            
 98            try:
 99                timestamp = site["timestamp"](r[-1])
100            except KeyError:
101                timestamp = ""
102
103            output.append([link, timestamp])
104    return output
105
106async def replace(update: Update, _) -> None:
107    try:
108        links = parse_text(update.message.text)
109    except TypeError:
110        links = parse_text(update.message.caption)
111
112    for link in links:
113        logger.info(link)
114
115        user = update.effective_user.mention_markdown_v2(update.effective_user.name)
116        text = link_message.format(user, link[0], link[1])
117        message = await update.effective_chat.send_message(text, parse_mode=ParseMode.MARKDOWN_V2)
118        await sleep(FEEDBACK_TIMEOUT)
119        await message.edit_reply_markup(reply_markup=get_message_markup())
120
121async def feedback(update: Update, _, data_json: str) -> None:
122    data = json.loads(data_json)
123
124    if data["feedback"]:
125        await update.callback_query.answer("Bene!")
126        await update.effective_message.edit_reply_markup()
127        return
128
129    await update.callback_query.answer("Ci ho provato...")
130    await update.effective_message.delete()
131    return