all repos — FixYouTube-legacy @ 5849992f3d4a49ae36ba6074cd0235a6897dd966

A better way to embed YouTube videos everywhere (inspired by FixTweet).

fixyoutube/db.py (view raw)

 1from peewee import Model, CharField, TextField, IntegerField, DateTimeField, DoesNotExist
 2from playhouse.sqliteq import SqliteQueueDatabase
 3from requests import get
 4from requests.exceptions import JSONDecodeError
 5from datetime import datetime, timedelta
 6import fixyoutube.constants as c
 7
 8db = SqliteQueueDatabase(c.DB_URL)
 9
10class BaseModel(Model):
11    class Meta:
12        database = db
13
14class Video(BaseModel):
15    videoId = CharField(unique=True)
16    title = CharField()
17    description = TextField()
18    uploader = CharField()
19    duration = IntegerField()
20    height = IntegerField()
21    width = IntegerField()
22    url = TextField()
23    timestamp = DateTimeField(default=datetime.now)
24
25def cache_video(info):
26    try:
27        Video.delete().where(Video.videoId == info['videoId']).execute()
28    except DoesNotExist:
29        pass
30    return Video.create(**info)
31
32def get_video_from_cache(video):
33    try:
34        temp = Video.get(Video.videoId == video)
35    except DoesNotExist:
36        return None
37
38    delta = datetime.now() - temp.timestamp
39    if delta > timedelta(minutes=c.YT_TTL_MINUTES):
40        return None
41    
42    return temp
43
44def get_info(video):
45    info = get_video_from_cache(video)
46
47    if info is not None:
48        return info
49    try:
50        res = get(c.INVIDIOUS_ENDPOINT.format(video)).json()
51    except JSONDecodeError:
52        print("JSON decode error. Bad instance or video does not exist.")
53        return None
54    
55    try:
56        format = [ x for x in res["formatStreams"] if x["container"] == "mp4"][-1]
57    except KeyError:
58        return None
59    
60    width, height = format["size"].split("x")
61
62    info = {
63        "videoId": res["videoId"],
64        "title": res["title"],
65        "description": res["description"],
66        "uploader": res["author"],
67        "duration": res["lengthSeconds"],
68        "height": height,
69        "width": width,
70        "url": format["url"]
71    }
72
73    return cache_video(info)
74
75def clear_cache():
76    Video.delete().execute()
77
78db.connect()
79db.create_tables([Video])