escarbot/replace.py (view raw)
1from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
2from telegram.constants import ParseMode
3from escarbot.config import INLINE_SEP, FEEDBACK_TIMEOUT
4import logging, re, json
5from asyncio import sleep
6logger = logging.getLogger(__name__)
7
8re_flags = re.I | re.M
9
10def get_human_readable(input_str: str, indicator: str, offset: int = 0) -> (int, int):
11 try:
12 result = int(input_str[offset:].split(indicator, maxsplit=1)[0])
13 return result, len(str(result)) + len(indicator)
14 except ValueError:
15 return 0, 0
16
17youtube_timestamp_regex = re.compile(r"(?:&|\?)t=(\d*h?\d*m?\d*s?)", re_flags)
18def youtube_timestamp(input_str: str) -> str:
19 result = youtube_timestamp_regex.findall(input_str)
20 try:
21 input_string = result[0]
22 seconds = int(input_string)
23 hours, remainder = divmod(seconds, 3600)
24 minutes, seconds = divmod(remainder, 60)
25 except IndexError: # nothing to parse
26 return ""
27 except ValueError: # human-readable number
28 hours, offset = get_human_readable(input_string, "h")
29 minutes, offset = get_human_readable(input_string, "m", offset)
30 seconds, offset = get_human_readable(input_string, "s", offset)
31
32 if hours == 0:
33 return '{}:{:02}'.format(minutes, seconds)
34 return '{}:{:02}:{:02}'.format(hours, minutes, seconds)
35
36replacers = [
37 {
38 "regex": re.compile(r"(?:(?:https?:)?\/\/)?(?:(?:www|m)\.)?(?:(?:youtube(?:-nocookie)?\.com|youtu.be))(?:\/(?:[\w\-]+\?v=|embed\/|live\/|v\/|shorts\/)?)([\w\-]+)(\S+)?", re_flags),
39 "becomes": "https://y.outube.duckdns.org/{}",
40 "timestamp": youtube_timestamp
41 },
42 {
43 "regex": re.compile(r"(?:https?:\/\/)(?:www\.)?twitter\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
44 "becomes": "https://fxtwitter.com/{}/status/{}",
45 },
46 {
47 "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?x\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
48 "becomes": "https://fixupx.com/{}/status/{}",
49 },
50 {
51 "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?instagram\.com\/((?:reel)|p)\/([A-Za-z0-9_]{11})[\/\?\w=&]*", re_flags),
52 "becomes": "https://ddinstagram.com/{}/{}",
53 },
54 {
55 "regex": re.compile(r"(?:https?:\/\/)?(?:(?:www)|(?:vm))?\.?tiktok\.com\/@([\w\d_.]+)\/(?:video)\/(\d+)", re_flags),
56 "becomes": "https://www.vxtiktok.com/@{}/video/{}",
57 },
58 {
59 "regex": re.compile(r"(?:https?:\/\/)?(?:(?:www)|(?:vm))?\.?tiktok\.com\/([\w]+)\/?", re_flags),
60 "becomes": "https://vm.vxtiktok.com/{}/",
61 },
62]
63
64link_message = "[🔗]({}) Da {}\. {}"
65
66def get_callback_data(feedback: bool) -> str:
67 payload = { "feedback": feedback }
68 return "feedback" + INLINE_SEP + json.dumps(payload)
69
70buttons = InlineKeyboardMarkup([
71 [
72 InlineKeyboardButton(text="✅", callback_data=get_callback_data(True)),
73 InlineKeyboardButton(text="❌", callback_data=get_callback_data(False)),
74 ]
75])
76
77def format_template(template: str, regex_result) -> str:
78 result_type = type(regex_result)
79 if result_type is str:
80 return template.format(regex_result)
81 elif result_type is tuple or result_type is list:
82 return template.format(*regex_result)
83 elif result_type is dict:
84 return template.format(**regex_result)
85 else:
86 return ""
87
88def parse_text(message: str) -> list:
89 output = []
90 for site in replacers:
91 regex = site["regex"]
92 res = regex.findall(message)
93 for r in res:
94 link = format_template(site["becomes"], r)
95
96 try:
97 timestamp = site["timestamp"](r[-1])
98 except KeyError:
99 timestamp = ""
100
101 output.append([link, timestamp])
102 return output
103
104async def replace(update: Update, _) -> None:
105 message = update.message
106 try:
107 links = parse_text(message.text)
108 except TypeError:
109 links = parse_text(message.caption)
110
111 for link in links:
112 logger.info(link)
113 user = update.effective_user.mention_markdown_v2(update.effective_user.name)
114 text = link_message.format(link[0], user, link[1])
115 chat = update.effective_chat
116 message = await chat.send_message(text, parse_mode=ParseMode.MARKDOWN_V2, message_thread_id=message.message_thread_id)
117 await sleep(FEEDBACK_TIMEOUT)
118 await message.edit_reply_markup(reply_markup=buttons)
119
120async def feedback(update: Update, _, data_json: str) -> None:
121 data = json.loads(data_json)
122
123 if data["feedback"]:
124 await update.callback_query.answer("Bene!")
125 await update.effective_message.edit_reply_markup()
126 return
127
128 await update.callback_query.answer("Ci ho provato...")
129 await update.effective_message.delete()
130 return