all repos — myprecious @ d7583eb4604a185c188647b4cc192555e1579119

A lightweight web service to backup precious game saves.

myprecious/db.py (view raw)

 1
 2import os, uuid, sqlite3
 3from base64 import b64encode
 4from contextlib import suppress
 5from argon2 import PasswordHasher
 6from argon2.exceptions import VerifyMismatchError
 7import myprecious.constants as c
 8
 9ph = PasswordHasher()
10
11def get_hashable(password: str, salt: str):
12    return password + salt + c.SECRET_KEY
13
14def verify_user(username, hash, password, salt):
15    hashable = get_hashable(password, salt)
16    try:
17        ph.verify(hash, hashable)
18    except VerifyMismatchError:
19        return False
20
21    if ph.check_needs_rehash(hash):
22        new_hash = ph.hash(hashable)
23        query_str = "UPDATE login SET password = (?) WHERE username = (?);"
24        db_query(query_str, [new_hash, username])
25    return True
26
27def db_query(query, parameters):
28    with sqlite3.connect(c.DB_PATH) as db_connection:
29        curs = db_connection.cursor()
30        curs.execute(query, parameters)
31        return curs
32
33def db_query_one(query, parameters):
34        curs = db_query(query, parameters)
35        try:
36            return list(curs.fetchone())
37        except TypeError:
38            return None
39        
40def run_sql(sql_path: str):
41    with open(sql_path, 'r', encoding="utf-8") as f:
42        with sqlite3.connect(c.DB_PATH) as con:
43            curs = con.cursor()
44            sql = f.read()
45            curs.executescript(sql)
46
47def add_user_to_queue(username, password, email, salt=None):
48    if get_user_from_username(username) is not None:
49        return "registered"
50    if get_user_from_username(username, "queue") is not None:
51        return "queued"
52    add_user(username, password, email, salt, "queue")
53    return "done"
54
55def add_user(username, password, email, salt=None, table="login", hashed=False):
56    if salt is None:
57        salt = b64encode(os.urandom(12)).decode('utf-8')
58    query_str = f"insert or ignore into { table } (username, password, salt, email) values (?,?,?,?);"
59    if not hashed:
60        password = ph.hash(get_hashable(password, salt))
61    query_param = [username, password, salt, email]
62    return db_query(query_str, query_param)
63
64def get_user_from_username(username: str, table="login"):
65    return db_query_one(f"SELECT * FROM { table } where username = (?);", [username])
66
67def get_user_from_id(id: int):
68    return db_query_one("SELECT * from login where user_id = (?);", [id])
69
70def get_queued_users():
71    res = db_query("SELECT * from queue;", [])
72    return res.fetchall()
73
74def deny_user(nick):
75    return db_query_one("DELETE FROM queue WHERE username = (?)", [nick])
76
77def allow_user(nick):
78    r = get_user_from_username(nick, "queue")
79    r = add_user(r[0], r[1], r[3], r[2], hashed=True)
80    return deny_user(nick)
81
82def init_db():
83    with suppress(FileExistsError):
84        os.makedirs(c.BASE_DIRECTORY)
85    run_sql(c.MIGRATIONS_INIT_PATH)
86    add_user(c.DEFAULT_ADMIN_USER, c.DEFAULT_ADMIN_PW, c.DEFAULT_ADMIN_EMAIL)