main.py (view raw)
1import os
2import logging
3from io import BytesIO
4
5from PIL import Image
6from dotenv import load_dotenv
7from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
8from telegram.error import TelegramError
9from telegram.ext import ApplicationBuilder, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, \
10 PicklePersistence, filters, PersistenceInput
11
12from Api import get_random_image
13from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
14from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
15from Slot import spin, autospin, bet, cash
16
17load_dotenv()
18logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
19
20
21async def _get_message_content(message):
22 image = None
23 if len(message.photo) > 0:
24 p = message.photo[-1]
25 i = await p.get_file()
26 d = await i.download_as_bytearray()
27 image = Image.open(BytesIO(d))
28
29 content = ""
30 if message.text is not None:
31 content = message.text.strip()
32 elif message.caption is not None:
33 content = message.caption.strip()
34
35 lines = content.split("\n")
36 r = lines[0].split(" ")
37
38 try:
39 if r[0][0] == '/':
40 r.pop(0)
41 except IndexError:
42 pass
43
44 lines[0] = " ".join(r)
45 content = "\n".join(lines)
46
47 return image, content, _get_author(message)
48
49
50async def _get_reply(message, fallback=""):
51 if message is None:
52 return None, fallback, None
53
54 image, content, author = await _get_message_content(message)
55
56 return image, content, author
57
58
59def _get_lewd(context):
60 try:
61 return context.chat_data["lewd"]
62 except KeyError:
63 return False
64
65
66def _get_image(context):
67 if context is not None:
68 image, url = get_random_image(_get_lewd(context))
69
70 if image is None:
71 logging.warning("Getting Image failed")
72 raise TelegramError("bad image")
73
74 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
75
76 return image, markup
77
78
79async def _get_all(update, check_fn, context):
80 image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message)
81 image_content, text_content, author_content = await _get_message_content(update.message)
82
83 info_struct = {
84 "reply": {
85 "author": author_reply,
86 "text": text_reply,
87 "image": image_reply
88 },
89 "content": {
90 "author": author_content,
91 "text": text_content,
92 "image": image_content
93 }
94 }
95
96 logging.info(
97 f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
98
99 content = check_fn(info_struct)
100
101 if content is None:
102 return None, None, None
103
104 markup = ""
105 image = None
106
107 if image_reply is not None:
108 image = image_reply
109
110 if image_content is not None:
111 image = image_content
112
113 if image is None:
114 image, markup = _get_image(context)
115
116 return content, image, markup
117
118
119def start(update: Update, context: CallbackContext):
120 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
121
122
123async def set_lewd(update: Update, context: CallbackContext):
124 try:
125 output = False if context.chat_data["lewd"] else True
126 except KeyError:
127 output = True
128
129 context.chat_data['lewd'] = output
130 message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
131
132 return await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
133
134
135async def pic(update: Update, context: CallbackContext):
136 image, markup = _get_image(context)
137 return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
138
139
140def _get_author(message):
141 if message.forward_from is not None:
142 return format_author(message.forward_from)
143
144 if message.forward_sender_name is not None:
145 return message.forward_sender_name
146
147 if message.forward_from_chat is not None:
148 return message.forward_from_chat.title + (
149 "" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
150
151 return format_author(message.from_user)
152
153
154def tt_check(info):
155 reply = info['reply']['text']
156 content = info['content']['text']
157
158 input_text = f"{reply} {content}".replace("\n", " ")
159
160 if input_text.strip() == "":
161 return None
162
163 return input_text
164
165
166def ttbt_check(info):
167 reply = info['reply']['text'].strip()
168 content = info['content']['text'].strip()
169
170 if len(content.split("\n")) > 1:
171 input_text = content
172 else:
173 input_text = f"{reply}\n{content}"
174
175 if input_text.strip() == "":
176 return None
177
178 return input_text
179
180
181def splash_check(info):
182 reply = info['reply']['text']
183 content = info['content']['text']
184
185 if content.strip() == "":
186 author = info['reply']['author']
187 input_text = f"{author}\n{reply}"
188 else:
189 author = info['content']['author']
190 input_text = f"{author}\n{content}"
191
192 if len(input_text.strip().split("\n")) < 2:
193 return None
194
195 return input_text
196
197
198def wot_check(info):
199 reply = info['reply']['text']
200 content = info['content']['text']
201
202 input_text = f"{reply}\n{content}"
203
204 if input_text.strip() == "":
205 return None
206
207 return input_text
208
209
210async def ttbt(update: Update, context: CallbackContext):
211 content, image, markup = await _get_all(update, ttbt_check, context)
212
213 if image is None:
214 return await update.message.reply_text(l("no_caption", context))
215
216 image = tt_bt_effect(content, image)
217
218 if image is None:
219 return await update.message.reply_text(l("failed_effect", context))
220
221 return await update.message.reply_photo(photo=image, reply_markup=markup)
222
223
224async def tt(update: Update, context: CallbackContext):
225 content, image, markup = await _get_all(update, tt_check, context)
226
227 if image is None:
228 return await update.message.reply_text(l("no_caption", context))
229
230 image = tt_bt_effect(content, image)
231
232 if image is None:
233 return await update.message.reply_text(l("failed_effect", context))
234
235 return await update.message.reply_photo(photo=image, reply_markup=markup)
236
237
238async def bt(update: Update, context: CallbackContext):
239 content, image, markup = await _get_all(update, tt_check, context)
240
241 if image is None:
242 return await update.message.reply_text(l("no_caption", context))
243
244 image = bt_effect(content, image)
245
246 if image is None:
247 return await update.message.reply_text(l("failed_effect", context))
248
249 return await update.message.reply_photo(photo=image, reply_markup=markup)
250
251
252async def splash(update: Update, context: CallbackContext):
253 content, image, markup = await _get_all(update, splash_check, context)
254
255 if image is None:
256 return await update.message.reply_text(l("no_caption", context))
257
258 image = splash_effect(content, image)
259
260 if image is None:
261 return await update.message.reply_text(l("failed_effect", context))
262
263 return await update.message.reply_photo(photo=image, reply_markup=markup)
264
265
266async def wot(update: Update, context: CallbackContext):
267 content, image, markup = await _get_all(update, wot_check, context)
268
269 if image is None:
270 return await update.message.reply_text(l("no_caption", context))
271
272 image = wot_effect(content, image)
273
274 if image is None:
275 await update.message.reply_text(l("failed_effect", context))
276
277 await update.message.reply_photo(photo=image, reply_markup=markup)
278
279
280async def text(update: Update, context: CallbackContext):
281 content, image, markup = await _get_all(update, wot_check, context)
282
283 if image is None:
284 await update.message.reply_text(l("no_caption", context))
285 return
286
287 image = text_effect(content, image)
288
289 if image is None:
290 await update.message.reply_text(l("failed_effect", context))
291
292 await update.message.reply_photo(photo=image, reply_markup=markup)
293
294
295async def caps(update: Update, context: CallbackContext):
296 _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args))
297 await context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
298
299
300async def _set_lang(update: Update, context: CallbackContext, lang: str):
301 context.chat_data["lang"] = lang
302 await context.bot.send_message(chat_id=update.effective_chat.id,
303 text=l("language_set", context).format(format_lang(lang)))
304
305
306async def lang(update: Update, context: CallbackContext):
307 try:
308 selected = str(context.args[0])
309 except IndexError:
310 selected = None
311
312 if selected is None:
313 lang = format_lang(get_lang(context))
314 choices = ", ".join(langs) + "."
315 return await update.message.reply_text(text=l("current_language", context).format(lang, choices),
316 reply_markup=lang_markup)
317
318 if selected not in langs:
319 return await update.message.reply_text(text=l("invalid_language", context))
320
321 return await _set_lang(update, context, selected)
322
323
324def unknown(update: Update, context: CallbackContext):
325 logging.info(
326 f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
327
328
329async def error_callback(update: Update, context: CallbackContext):
330 try:
331 raise context.error
332 except TelegramError as e:
333 logging.error("TelegramError! " + str(e))
334 await context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
335
336
337def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
338 application.add_handler(CommandHandler(command, callback))
339 application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
340
341
342async def keyboard_handler(update: Update, context: CallbackContext):
343 query = update.callback_query
344 data = query.data
345
346 if data.startswith("reroll"):
347 amount = int(data.split(" ")[1])
348
349 if amount <= 1:
350 return await spin(update, context)
351 return await autospin(context, update.effective_chat.id, amount)
352
353 match data:
354 case "none":
355 return query.answer(l("none_callback", context))
356 case "set_lang_en":
357 lang = "en"
358 await _set_lang(update, context, lang)
359 return await query.answer(l("language_set", context).format(format_lang(lang)))
360 case "set_lang_it":
361 lang = "it"
362 await _set_lang(update, context, lang)
363 return await query.answer(l("language_set", context).format(format_lang(lang)))
364 case other:
365 logging.error(f"unknown callback: {data}")
366
367 return await query.answer()
368
369
370def main():
371 token = os.getenv("token")
372 pers = PersistenceInput(bot_data=False, callback_data=False)
373 persistence = PicklePersistence(filepath='bot-data.pkl', store_data=pers)
374
375 application = ApplicationBuilder().token(token).persistence(persistence).build()
376
377 application.add_error_handler(error_callback)
378 application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
379
380 # commands
381 application.add_handler(CommandHandler('start', start))
382 application.add_handler(CommandHandler('lang', lang))
383 application.add_handler(CommandHandler('lewd', set_lewd))
384 application.add_handler(CommandHandler('caps', caps))
385 application.add_handler(CommandHandler('pic', pic))
386
387 # effects
388 _add_effect_handler(application, 'ttbt', ttbt)
389 _add_effect_handler(application, 'tt', tt)
390 _add_effect_handler(application, 'bt', bt)
391 _add_effect_handler(application, 'splash', splash)
392 _add_effect_handler(application, 'wot', wot)
393 _add_effect_handler(application, 'text', text)
394
395 # games
396 application.add_handler(CommandHandler('spin', spin))
397 application.add_handler(CommandHandler('bet', bet))
398 application.add_handler(CommandHandler('cash', cash))
399
400 # fallback
401 application.add_handler(MessageHandler(filters.Command(), unknown))
402 application.run_polling()
403
404
405if __name__ == "__main__":
406 main()