main.py (view raw)
1from PIL import Image
2from Api import get_random_image
3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
4from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
5from Slot import spin, autospin, bet, cash
6
7from dotenv import load_dotenv
8load_dotenv()
9import os, logging
10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
11from io import BytesIO
12
13from telegram.error import TelegramError
14from telegram.ext import ApplicationBuilder, Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, PicklePersistence, filters, PersistenceInput
15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
16
17def _get_message_content(message):
18
19 image = None
20 if len(message.photo) > 0:
21 image = message.photo[-1].get_file()
22 image = Image.open(BytesIO(image.download_as_bytearray()))
23
24 content = ""
25 if message.text is not None:
26 content = message.text.strip()
27 elif message.caption is not None:
28 content = message.caption.strip()
29
30 lines = content.split("\n")
31 r = lines[0].split(" ")
32
33 try:
34 if r[0][0] == '/':
35 r.pop(0)
36 except IndexError:
37 pass
38
39 lines[0] = " ".join(r)
40 content = "\n".join(lines)
41
42 return image, content, _get_author(message)
43
44def _get_reply(message, fallback=""):
45
46 if message is None:
47 return None, fallback, None
48
49 image, content, author = _get_message_content(message)
50
51 return image, content, author
52
53def _get_lewd(context):
54
55 try:
56 return context.chat_data["lewd"]
57 except KeyError:
58 return False
59
60def _get_image(context):
61
62 if context is not None:
63 image, url = get_random_image(_get_lewd(context))
64
65 if image is None:
66 logging.warning("Getting Image failed")
67 raise TelegramError("bad image")
68
69 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
70
71 return image, markup
72
73def _get_all(update, check_fn, context):
74
75 image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
76 image_content, text_content, author_content = _get_message_content(update.message)
77
78 info_struct = {
79 "reply": {
80 "author": author_reply,
81 "text": text_reply,
82 "image": image_reply
83 },
84 "content": {
85 "author": author_content,
86 "text": text_content,
87 "image": image_content
88 }
89 }
90
91 logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
92
93 content = check_fn(info_struct)
94
95 if content is None:
96 return None, None, None
97
98 markup = ""
99 image = None
100
101 if image_reply is not None:
102 image = image_reply
103
104 if image_content is not None:
105 image = image_content
106
107 if image is None:
108 image, markup = _get_image(context)
109
110 return content, image, markup
111
112def start(update: Update, context: CallbackContext):
113 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
114
115async def set_lewd(update: Update, context: CallbackContext):
116
117 try:
118 output = False if context.chat_data["lewd"] else True
119 except KeyError:
120 output = True
121
122 context.chat_data['lewd'] = output
123 message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
124
125 return await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
126
127async def pic(update: Update, context: CallbackContext):
128
129 image, markup = _get_image(context)
130 return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
131
132def _get_author(message):
133
134 if message.forward_from is not None:
135 return format_author(message.forward_from)
136
137 if message.forward_sender_name is not None:
138 return message.forward_sender_name
139
140 if message.forward_from_chat is not None:
141 return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
142
143 return format_author(message.from_user)
144
145def tt_check(info):
146
147 reply = info['reply']['text']
148 content = info['content']['text']
149
150 input_text = f"{reply} {content}".replace("\n", " ")
151
152 if input_text.strip() == "":
153 return None
154
155 return input_text
156
157def ttbt_check(info):
158
159 reply = info['reply']['text'].strip()
160 content = info['content']['text'].strip()
161
162 if len(content.split("\n")) > 1:
163 input_text = content
164 else:
165 input_text = f"{reply}\n{content}"
166
167 if input_text.strip() == "":
168 return None
169
170 return input_text
171
172def splash_check(info):
173
174 reply = info['reply']['text']
175 content = info['content']['text']
176
177 if content.strip() == "":
178 author = info['reply']['author']
179 input_text = f"{author}\n{reply}"
180 else:
181 author = info['content']['author']
182 input_text = f"{author}\n{content}"
183
184 if len(input_text.strip().split("\n")) < 2:
185 return None
186
187 return input_text
188
189def wot_check(info):
190
191 reply = info['reply']['text']
192 content = info['content']['text']
193
194 input_text = f"{reply}\n{content}"
195
196 if input_text.strip() == "":
197 return None
198
199 return input_text
200
201async def ttbt(update: Update, context: CallbackContext):
202
203 content, image, markup = _get_all(update, ttbt_check, context)
204
205 if image is None:
206 return await update.message.reply_text(l("no_caption", context))
207
208 image = tt_bt_effect(content, image)
209
210 if image is None:
211 return await update.message.reply_text(l("failed_effect", context))
212
213 return await update.message.reply_photo(photo=image, reply_markup=markup)
214
215
216async def tt(update: Update, context: CallbackContext):
217
218 content, image, markup = _get_all(update, tt_check, context)
219
220 if image is None:
221 return await update.message.reply_text(l("no_caption", context))
222
223 image = tt_bt_effect(content, image)
224
225 if image is None:
226 return await update.message.reply_text(l("failed_effect", context))
227
228 return await update.message.reply_photo(photo=image, reply_markup=markup)
229
230async def bt(update: Update, context: CallbackContext):
231
232 content, image, markup = _get_all(update, tt_check, context)
233
234 if image is None:
235 return await update.message.reply_text(l("no_caption", context))
236
237 image = bt_effect(content, image)
238
239 if image is None:
240 return await update.message.reply_text(l("failed_effect", context))
241
242 return await update.message.reply_photo(photo=image, reply_markup=markup)
243
244async def splash(update: Update, context: CallbackContext):
245
246 content, image, markup = _get_all(update, splash_check, context)
247
248 if image is None:
249 return await update.message.reply_text(l("no_caption", context))
250
251 image = splash_effect(content, image)
252
253 if image is None:
254 return await update.message.reply_text(l("failed_effect", context))
255
256 return await update.message.reply_photo(photo=image, reply_markup=markup)
257
258async def wot(update: Update, context: CallbackContext):
259
260 content, image, markup = _get_all(update, wot_check, context)
261
262 if image is None:
263 return await update.message.reply_text(l("no_caption", context))
264
265 image = wot_effect(content, image)
266
267 if image is None:
268 await update.message.reply_text(l("failed_effect", context))
269
270 await update.message.reply_photo(photo=image, reply_markup=markup)
271
272async def text(update: Update, context: CallbackContext):
273
274 content, image, markup = _get_all(update, wot_check, context)
275
276 if image is None:
277 await update.message.reply_text(l("no_caption", context))
278 return
279
280 image = text_effect(content, image)
281
282 if image is None:
283 await update.message.reply_text(l("failed_effect", context))
284
285 await update.message.reply_photo(photo=image, reply_markup=markup)
286
287async def caps(update: Update, context: CallbackContext):
288
289 _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
290 await context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
291
292async def _set_lang(update: Update, context: CallbackContext, lang: str):
293 context.chat_data["lang"] = lang
294 await context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang)))
295
296async def lang(update: Update, context: CallbackContext):
297 try:
298 selected = str(context.args[0])
299 except IndexError:
300 selected = None
301
302 if selected is None:
303 lang = format_lang(get_lang(context))
304 choices = ", ".join(langs) + "."
305 return await update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup)
306
307 if selected not in langs:
308 return await update.message.reply_text(text=l("invalid_language", context))
309
310 return await _set_lang(update, context, selected)
311
312def unknown(update: Update, context: CallbackContext):
313 logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
314
315async def error_callback(update: Update, context: CallbackContext):
316 try:
317 raise context.error
318 except TelegramError as e:
319 logging.error("TelegramError! " + str(e))
320 await context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
321
322def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
323 application.add_handler(CommandHandler(command, callback))
324 application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
325
326async def keyboard_handler(update: Update, context: CallbackContext):
327 query = update.callback_query
328 data = query.data
329
330 if data.startswith("reroll"):
331 amount = int(data.split(" ")[1])
332
333 if amount <= 1:
334 return await spin(update, context)
335 return await autospin(context, update.effective_chat.id, amount)
336
337 match data:
338 case "none":
339 return query.answer(l("none_callback", context))
340 case "set_lang_en":
341 lang = "en"
342 await _set_lang(update, context, lang)
343 return await query.answer(l("language_set", context).format(format_lang(lang)))
344 case "set_lang_it":
345 lang = "it"
346 await _set_lang(update, context, lang)
347 return await query.answer(l("language_set", context).format(format_lang(lang)))
348 case other:
349 logging.error(f"unknown callback: {data}")
350
351 return await query.answer()
352
353def main():
354
355 pers = PersistenceInput(bot_data = False, callback_data = False)
356
357 application = ApplicationBuilder()
358 application.token(os.getenv("token"))
359 application.persistence(PicklePersistence(filepath='bot-data.pkl', store_data=pers))
360
361 application = application.build()
362
363
364 application.add_error_handler(error_callback)
365 application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
366
367 # commands
368 application.add_handler(CommandHandler('start', start))
369 application.add_handler(CommandHandler('lang', lang))
370 application.add_handler(CommandHandler('lewd', set_lewd))
371 application.add_handler(CommandHandler('caps', caps))
372 application.add_handler(CommandHandler('pic', pic))
373
374 # effects
375 _add_effect_handler(application, 'ttbt', ttbt)
376 _add_effect_handler(application, 'tt', tt)
377 _add_effect_handler(application, 'bt', bt)
378 _add_effect_handler(application, 'splash', splash)
379 _add_effect_handler(application, 'wot', wot)
380 _add_effect_handler(application, 'text', text)
381
382 # games
383 application.add_handler(CommandHandler('spin', spin))
384 application.add_handler(CommandHandler('bet', bet))
385 application.add_handler(CommandHandler('cash', cash))
386
387 # fallback
388 application.add_handler(MessageHandler(filters.Command(), unknown))
389 application.run_polling()
390
391if __name__ == "__main__":
392 main()