Replacer.py (view raw)
1from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
2from telegram.constants import ParseMode
3from Config import INLINE_SEP, FEEDBACK_TIMEOUT
4import logging, re, json
5from asyncio import sleep
6logger = logging.getLogger(__name__)
7
8re_flags = re.I | re.M
9
10replacers = [
11 {
12 "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?youtu\.?be(?:\.com)?\/?.*(?:watch|embed)?(?:.*v=|v\/|\/)([\w\-_]+)\&?", re_flags),
13 "becomes": "https://y.outube.duckdns.org/{}",
14 },
15 {
16 "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?twitter\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
17 "becomes": "https://fxtwitter.com/{}/status/{}",
18 },
19 {
20 "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?x\.com\/(?:#!\/)?(.*)\/status(?:es)?\/([^\/\?\s]+)", re_flags),
21 "becomes": "https://fixupx.com/{}/status/{}",
22 },
23 {
24 "regex": re.compile(r"(?:https?:\/\/)?(?:www\.)?instagram\.com\/((?:reel)|p)\/([A-Za-z0-9_]{11})[\/\?\w=&]*", re_flags),
25 "becomes": "https://ddinstagram.com/{}/{}",
26 },
27]
28
29link_message = "Da {}[\.]({})"
30
31def get_callback_data(feedback: bool) -> str:
32 payload = { "feedback": feedback }
33 return "feedback" + INLINE_SEP + json.dumps(payload)
34
35def get_message_markup() -> InlineKeyboardMarkup:
36 buttons = [
37 [
38 InlineKeyboardButton(text="✅", callback_data=get_callback_data(True)),
39 InlineKeyboardButton(text="❌", callback_data=get_callback_data(False)),
40 ]
41 ]
42 return InlineKeyboardMarkup(buttons)
43
44def format_template(template: str, regex_result) -> str:
45 result_type = type(regex_result)
46 if result_type is str:
47 return template.format(regex_result)
48 elif result_type is tuple or result_type is list:
49 return template.format(*regex_result)
50 elif result_type is dict:
51 return template.format(**regex_result)
52 else:
53 return ""
54
55def parse_text(message: str) -> list:
56 output = []
57 for site in replacers:
58 regex = site["regex"]
59 res = regex.findall(message)
60 for r in res:
61 link = format_template(site["becomes"], r)
62 output.append(link)
63 return output
64
65async def replace(update: Update, _) -> None:
66 try:
67 links = parse_text(update.message.text)
68 except TypeError:
69 links = parse_text(update.message.caption)
70
71 for link in links:
72 logger.info(link)
73 text = link_message.format(update.effective_user.mention_markdown_v2(update.effective_user.name), link)
74 message = await update.effective_chat.send_message(text, parse_mode=ParseMode.MARKDOWN_V2)
75 await sleep(FEEDBACK_TIMEOUT)
76 await message.edit_reply_markup(reply_markup=get_message_markup())
77
78async def feedback(update: Update, _, data_json: str) -> None:
79 data = json.loads(data_json)
80
81 if data["feedback"]:
82 await update.callback_query.answer("Bene!")
83 await update.effective_message.edit_reply_markup()
84 return
85
86 await update.callback_query.answer("Ci ho provato...")
87 await update.effective_message.delete()
88 return