main.py (view raw)
1from PIL import Image
2from Api import get_random_image
3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
4from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
5from Slot import spin, autospin, bet, cash
6
7from dotenv import load_dotenv
8load_dotenv()
9import os, logging
10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
11from io import BytesIO
12
13from telegram.error import TelegramError
14from telegram.ext import ApplicationBuilder, Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, PicklePersistence, filters, PersistenceInput
15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
16
17async def _get_message_content(message):
18
19 image = None
20 if len(message.photo) > 0:
21 p = message.photo[-1]
22 i = await p.get_file()
23 d = await i.download_as_bytearray()
24 image = Image.open(BytesIO(d))
25
26 content = ""
27 if message.text is not None:
28 content = message.text.strip()
29 elif message.caption is not None:
30 content = message.caption.strip()
31
32 lines = content.split("\n")
33 r = lines[0].split(" ")
34
35 try:
36 if r[0][0] == '/':
37 r.pop(0)
38 except IndexError:
39 pass
40
41 lines[0] = " ".join(r)
42 content = "\n".join(lines)
43
44 return image, content, _get_author(message)
45
46async def _get_reply(message, fallback=""):
47
48 if message is None:
49 return None, fallback, None
50
51 image, content, author = await _get_message_content(message)
52
53 return image, content, author
54
55def _get_lewd(context):
56
57 try:
58 return context.chat_data["lewd"]
59 except KeyError:
60 return False
61
62def _get_image(context):
63
64 if context is not None:
65 image, url = get_random_image(_get_lewd(context))
66
67 if image is None:
68 logging.warning("Getting Image failed")
69 raise TelegramError("bad image")
70
71 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
72
73 return image, markup
74
75async def _get_all(update, check_fn, context):
76
77 image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message)
78 image_content, text_content, author_content = await _get_message_content(update.message)
79
80 info_struct = {
81 "reply": {
82 "author": author_reply,
83 "text": text_reply,
84 "image": image_reply
85 },
86 "content": {
87 "author": author_content,
88 "text": text_content,
89 "image": image_content
90 }
91 }
92
93 logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
94
95 content = check_fn(info_struct)
96
97 if content is None:
98 return None, None, None
99
100 markup = ""
101 image = None
102
103 if image_reply is not None:
104 image = image_reply
105
106 if image_content is not None:
107 image = image_content
108
109 if image is None:
110 image, markup = _get_image(context)
111
112 return content, image, markup
113
114def start(update: Update, context: CallbackContext):
115 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
116
117async def set_lewd(update: Update, context: CallbackContext):
118
119 try:
120 output = False if context.chat_data["lewd"] else True
121 except KeyError:
122 output = True
123
124 context.chat_data['lewd'] = output
125 message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
126
127 return await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
128
129async def pic(update: Update, context: CallbackContext):
130
131 image, markup = _get_image(context)
132 return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
133
134def _get_author(message):
135
136 if message.forward_from is not None:
137 return format_author(message.forward_from)
138
139 if message.forward_sender_name is not None:
140 return message.forward_sender_name
141
142 if message.forward_from_chat is not None:
143 return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
144
145 return format_author(message.from_user)
146
147def tt_check(info):
148
149 reply = info['reply']['text']
150 content = info['content']['text']
151
152 input_text = f"{reply} {content}".replace("\n", " ")
153
154 if input_text.strip() == "":
155 return None
156
157 return input_text
158
159def ttbt_check(info):
160
161 reply = info['reply']['text'].strip()
162 content = info['content']['text'].strip()
163
164 if len(content.split("\n")) > 1:
165 input_text = content
166 else:
167 input_text = f"{reply}\n{content}"
168
169 if input_text.strip() == "":
170 return None
171
172 return input_text
173
174def splash_check(info):
175
176 reply = info['reply']['text']
177 content = info['content']['text']
178
179 if content.strip() == "":
180 author = info['reply']['author']
181 input_text = f"{author}\n{reply}"
182 else:
183 author = info['content']['author']
184 input_text = f"{author}\n{content}"
185
186 if len(input_text.strip().split("\n")) < 2:
187 return None
188
189 return input_text
190
191def wot_check(info):
192
193 reply = info['reply']['text']
194 content = info['content']['text']
195
196 input_text = f"{reply}\n{content}"
197
198 if input_text.strip() == "":
199 return None
200
201 return input_text
202
203async def ttbt(update: Update, context: CallbackContext):
204
205 content, image, markup = await _get_all(update, ttbt_check, context)
206
207 if image is None:
208 return await update.message.reply_text(l("no_caption", context))
209
210 image = tt_bt_effect(content, image)
211
212 if image is None:
213 return await update.message.reply_text(l("failed_effect", context))
214
215 return await update.message.reply_photo(photo=image, reply_markup=markup)
216
217
218async def tt(update: Update, context: CallbackContext):
219
220 content, image, markup = await _get_all(update, tt_check, context)
221
222 if image is None:
223 return await update.message.reply_text(l("no_caption", context))
224
225 image = tt_bt_effect(content, image)
226
227 if image is None:
228 return await update.message.reply_text(l("failed_effect", context))
229
230 return await update.message.reply_photo(photo=image, reply_markup=markup)
231
232async def bt(update: Update, context: CallbackContext):
233
234 content, image, markup = await _get_all(update, tt_check, context)
235
236 if image is None:
237 return await update.message.reply_text(l("no_caption", context))
238
239 image = bt_effect(content, image)
240
241 if image is None:
242 return await update.message.reply_text(l("failed_effect", context))
243
244 return await update.message.reply_photo(photo=image, reply_markup=markup)
245
246async def splash(update: Update, context: CallbackContext):
247
248 content, image, markup = await _get_all(update, splash_check, context)
249
250 if image is None:
251 return await update.message.reply_text(l("no_caption", context))
252
253 image = splash_effect(content, image)
254
255 if image is None:
256 return await update.message.reply_text(l("failed_effect", context))
257
258 return await update.message.reply_photo(photo=image, reply_markup=markup)
259
260async def wot(update: Update, context: CallbackContext):
261
262 content, image, markup = await _get_all(update, wot_check, context)
263
264 if image is None:
265 return await update.message.reply_text(l("no_caption", context))
266
267 image = wot_effect(content, image)
268
269 if image is None:
270 await update.message.reply_text(l("failed_effect", context))
271
272 await update.message.reply_photo(photo=image, reply_markup=markup)
273
274async def text(update: Update, context: CallbackContext):
275
276 content, image, markup = await _get_all(update, wot_check, context)
277
278 if image is None:
279 await update.message.reply_text(l("no_caption", context))
280 return
281
282 image = text_effect(content, image)
283
284 if image is None:
285 await update.message.reply_text(l("failed_effect", context))
286
287 await update.message.reply_photo(photo=image, reply_markup=markup)
288
289async def caps(update: Update, context: CallbackContext):
290
291 _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args))
292 await context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
293
294async def _set_lang(update: Update, context: CallbackContext, lang: str):
295 context.chat_data["lang"] = lang
296 await context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang)))
297
298async def lang(update: Update, context: CallbackContext):
299 try:
300 selected = str(context.args[0])
301 except IndexError:
302 selected = None
303
304 if selected is None:
305 lang = format_lang(get_lang(context))
306 choices = ", ".join(langs) + "."
307 return await update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup)
308
309 if selected not in langs:
310 return await update.message.reply_text(text=l("invalid_language", context))
311
312 return await _set_lang(update, context, selected)
313
314def unknown(update: Update, context: CallbackContext):
315 logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
316
317async def error_callback(update: Update, context: CallbackContext):
318 try:
319 raise context.error
320 except TelegramError as e:
321 logging.error("TelegramError! " + str(e))
322 await context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
323
324def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
325 application.add_handler(CommandHandler(command, callback))
326 application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
327
328async def keyboard_handler(update: Update, context: CallbackContext):
329 query = update.callback_query
330 data = query.data
331
332 if data.startswith("reroll"):
333 amount = int(data.split(" ")[1])
334
335 if amount <= 1:
336 return await spin(update, context)
337 return await autospin(context, update.effective_chat.id, amount)
338
339 match data:
340 case "none":
341 return query.answer(l("none_callback", context))
342 case "set_lang_en":
343 lang = "en"
344 await _set_lang(update, context, lang)
345 return await query.answer(l("language_set", context).format(format_lang(lang)))
346 case "set_lang_it":
347 lang = "it"
348 await _set_lang(update, context, lang)
349 return await query.answer(l("language_set", context).format(format_lang(lang)))
350 case other:
351 logging.error(f"unknown callback: {data}")
352
353 return await query.answer()
354
355def main():
356
357 pers = PersistenceInput(bot_data = False, callback_data = False)
358
359 application = ApplicationBuilder()
360 application.token(os.getenv("token"))
361 application.persistence(PicklePersistence(filepath='bot-data.pkl', store_data=pers))
362
363 application = application.build()
364
365
366 application.add_error_handler(error_callback)
367 application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
368
369 # commands
370 application.add_handler(CommandHandler('start', start))
371 application.add_handler(CommandHandler('lang', lang))
372 application.add_handler(CommandHandler('lewd', set_lewd))
373 application.add_handler(CommandHandler('caps', caps))
374 application.add_handler(CommandHandler('pic', pic))
375
376 # effects
377 _add_effect_handler(application, 'ttbt', ttbt)
378 _add_effect_handler(application, 'tt', tt)
379 _add_effect_handler(application, 'bt', bt)
380 _add_effect_handler(application, 'splash', splash)
381 _add_effect_handler(application, 'wot', wot)
382 _add_effect_handler(application, 'text', text)
383
384 # games
385 application.add_handler(CommandHandler('spin', spin))
386 application.add_handler(CommandHandler('bet', bet))
387 application.add_handler(CommandHandler('cash', cash))
388
389 # fallback
390 application.add_handler(MessageHandler(filters.Command(), unknown))
391 application.run_polling()
392
393if __name__ == "__main__":
394 main()