main.py (view raw)
1from PIL import Image
2from Api import get_random_image, rating_normal, rating_lewd
3from Effects import tt_bt_effect, splash_effect
4import Constants as C
5from Constants import get_localized_string as l
6
7from dotenv import load_dotenv
8load_dotenv()
9import os, logging
10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
11from io import BytesIO
12
13from telegram.error import TelegramError, BadRequest
14from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
16
17lang = 'us'
18
19def _get_args(context):
20 logging.info(context.args)
21 return ' '.join(context.args)
22
23def _img_to_bio(image):
24 bio = BytesIO()
25
26 if image.mode in ("RGBA", "P"):
27 image = image.convert("RGB")
28
29 image.save(bio, 'JPEG')
30 bio.seek(0)
31 return bio
32
33def _ttbt_general(context, text, image=None):
34 if text.strip() == "":
35 return None, l("no_caption", lang)
36
37 markup = ""
38 if image is None:
39 image, markup = _get_image(context, bio=False)
40 image = _img_to_bio(tt_bt_effect(text, image))
41 return image, markup
42
43def _parse_message(text: str):
44 pass
45
46def _get_message_content(message):
47 image = None
48 if len(message.photo) > 0:
49 image = message.photo[-1].get_file()
50 image = Image.open(BytesIO(image.download_as_bytearray()))
51
52 content = ""
53 if message.text is not None:
54 content = message.text.strip()
55 elif message.caption is not None:
56 content = message.caption.strip()
57
58 lines = content.split("\n")
59 r = lines[0].split(" ")
60
61 try:
62 if r[0][0] == '/':
63 r.pop(0)
64 except IndexError:
65 pass
66
67 lines[0] = " ".join(r)
68 return image, "\n".join(lines)
69
70def _get_reply(message, fallback=""):
71
72 if message is None:
73 return None, fallback
74
75 image, content = _get_message_content(message)
76
77 return image, content
78
79def _get_lewd(context):
80 try:
81 return context.chat_data["lewd"]
82 except KeyError:
83 return False
84
85def _get_image(context, tag="", bio=True):
86 if context is not None:
87 if _get_lewd(context):
88 image, url = get_random_image(rating_lewd, tag)
89 else:
90 image, url = get_random_image(rating_normal, tag)
91
92 if image is None:
93 logging.warning("Getting Image failed")
94 raise TelegramError("bad image")
95
96 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
97
98 if bio:
99 return _img_to_bio(image), markup
100 return image, markup
101
102def _get_all(update, check_fn, context):
103 author = _get_author(update.message)
104 image_reply, reply = _get_reply(update.message.reply_to_message)
105
106 image_content, content = _get_message_content(update.message)
107 logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}")
108
109 content = check_fn(author, reply, content)
110
111 if content is None:
112 return None, None, None
113
114 markup = ""
115 image = None
116 if image_reply is not None:
117 image = image_reply
118
119 if image_content is not None:
120 image = image_content
121
122 if image is None:
123 image, markup = _get_image(context, bio=False)
124
125 return content, image, markup
126
127def start(update: Update, context: CallbackContext):
128 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
129
130def set_lewd(update: Update, context: CallbackContext):
131 try:
132 output = False if context.chat_data["lewd"] else True
133 except KeyError:
134 output = True
135
136 context.chat_data['lewd'] = output
137 message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
138
139 context.bot.send_message(chat_id=update.effective_chat.id, text=message)
140
141def pic(update: Update, context: CallbackContext):
142 try:
143 tag = " " + context.args[0]
144 except IndexError:
145 tag = ""
146
147 image, markup = _get_image(context, tag)
148 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
149
150def raw(update: Update, context: CallbackContext):
151 tag = ""
152 try:
153 tag += " " + context.args[0]
154 tag += " " + context.args[1]
155 except IndexError:
156 pass
157
158 image, url = get_random_image("", tag)
159
160 if image is None:
161 logging.warning("Getting Image failed")
162 raise TelegramError("bad image")
163
164 image = _img_to_bio(image)
165 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
166
167 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
168
169def pilu(update: Update, context: CallbackContext):
170 logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
171 try:
172 tag = " " + context.args[0]
173 except IndexError:
174 tag = ""
175 image, url = get_random_image("rating:e" + tag)
176 if image is None:
177 logging.warning("Getting Image failed")
178 raise TelegramError("bad image")
179 return
180
181 image = _img_to_bio(image)
182 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
183
184 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
185
186def _format_author(user):
187 if user.username is not None:
188 return user.full_name + f" ({user.username})"
189 return user.full_name
190
191def _get_author(message):
192
193 if message.reply_to_message is None:
194 return _format_author(message.from_user)
195
196 if message.reply_to_message.forward_from is not None:
197 return _format_author(message.reply_to_message.forward_from)
198
199 if message.reply_to_message.forward_sender_name is not None:
200 return message.reply_to_message.forward_sender_name
201
202 return _format_author(message.reply_to_message.from_user)
203
204def tt_check(author, content, reply):
205 input_text = f"{reply} {content}".replace("\n", " ")
206
207 if input_text.strip() == "":
208 return None
209
210 return input_text
211
212def ttbt_check(author, content, reply):
213 input_text = f"{reply}\n{content}"
214
215 if input_text.strip() == "":
216 return None
217
218 return input_text
219
220def splash_check(author, content, reply):
221 input_text = f"{author}\n{content}\n{reply}"
222
223 if len(input_text.strip().split("\n")) < 2:
224 return None
225
226 return input_text
227
228def ttbt(update: Update, context: CallbackContext):
229 content, image, markup = _get_all(update, ttbt_check, context)
230
231 if image is None:
232 update.message.reply_text(l("no_caption", lang))
233 return
234
235 image = _img_to_bio(tt_bt_effect(content, image))
236 update.message.reply_photo(photo=image, reply_markup=markup)
237
238
239def tt(update: Update, context: CallbackContext):
240 content, image, markup = _get_all(update, tt_check, context)
241
242 if image is None:
243 update.message.reply_text(l("no_caption", lang))
244 return
245
246 image = _img_to_bio(tt_bt_effect(content, image))
247 update.message.reply_photo(photo=image, reply_markup=markup)
248
249def bt(update: Update, context: CallbackContext):
250 content, image, markup = _get_all(update, tt_check, context)
251
252 if image is None:
253 update.message.reply_text(l("no_caption", lang))
254 return
255
256 image = _img_to_bio(tt_bt_effect("\n" + content, image))
257 update.message.reply_photo(photo=image, reply_markup=markup)
258
259def splash(update: Update, context: CallbackContext):
260 content, image, markup = _get_all(update, splash_check, context)
261
262 if image is None:
263 update.message.reply_text(l("no_caption", lang))
264 return
265
266 image = _img_to_bio(splash_effect(content, image))
267 update.message.reply_photo(photo=image, reply_markup=markup)
268
269def caps(update: Update, context: CallbackContext):
270 _, reply = _get_reply(update.message.reply_to_message, _get_args(context))
271 context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
272
273def unknown(update: Update, context: CallbackContext):
274 logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
275
276def error_callback(update: Update, context: CallbackContext):
277 try:
278 raise context.error
279 except TelegramError:
280 logging.error("TelegramError!!")
281 context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
282
283def _add_effect_handler(dispatcher, command: str, callback):
284 dispatcher.add_handler(CommandHandler(command, callback))
285 dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
286
287def main():
288
289 updater = Updater(token=os.getenv("token"),
290 persistence=PicklePersistence(filename='bot-data.pkl',
291 store_bot_data=False,
292 store_callback_data=False,
293 store_user_data=False))
294
295 dispatcher = updater.dispatcher
296 dispatcher.add_error_handler(error_callback)
297
298 dispatcher.add_handler(CommandHandler('start', start))
299 dispatcher.add_handler(CommandHandler('lewd', set_lewd))
300 dispatcher.add_handler(CommandHandler('caps', caps))
301 dispatcher.add_handler(CommandHandler('pic', pic))
302 dispatcher.add_handler(CommandHandler('raw', raw))
303 dispatcher.add_handler(CommandHandler('pilu', pilu))
304
305 _add_effect_handler(dispatcher, 'ttbt', ttbt)
306 _add_effect_handler(dispatcher, 'tt', tt)
307 _add_effect_handler(dispatcher, 'bt', bt)
308 _add_effect_handler(dispatcher, 'splash', splash)
309
310 dispatcher.add_handler(MessageHandler(Filters.command, unknown))
311 updater.start_polling()
312 updater.idle()
313
314if __name__ == "__main__":
315 main()