all repos — gemini-redirect @ 292825669970b7c009ac202b096f9a6960f7d265

content/blog/mdad/developing-a-python-application-for-cassandra/index.md (view raw)

  1+++
  2title = "Developing a Python application for Cassandra"
  3date = 2020-03-23T00:00:00+00:00
  4updated = 2020-04-16T07:52:26+00:00
  5+++
  6
  7_**Warning**: this post is, in fact, a shameless self-plug to my own library. If you continue reading, you accept that you are okay with this. Otherwise, please close the tab, shut down your computer, and set it on fire.__(Also, that was a joke. Please don’t do that.)_
  8
  9Let’s do some programming! Today we will be making a tiny CLI application in [Python](http://python.org/) that queries [Telegram’s API](https://core.telegram.org/api) and stores the data in [Cassandra](http://cassandra.apache.org/).
 10
 11## Our goal
 12
 13Our goal is to make a Python console application. This application will connect to [Telegram](https://telegram.org/), and ask for your account credentials. Once you have logged in, the application will fetch all of your open conversations and we will store these in Cassandra.
 14
 15With the data saved in Cassandra, we can now very efficiently query information about your conversations given their identifier offline (no need to query Telegram anymore).
 16
 17**In short**, we are making an application that performs efficient offline queries to Cassandra to print out information about your Telegram conversations given the ID you want to query.
 18
 19## Data model
 20
 21The application itself is really simple, and we only need one table to store all the relevant information we will be needing. This table called `**users**` will contain the following columns:
 22
 23* `**id**`, of type `int`. This will also be the `primary key` and we’ll use it to query the database later on.
 24* `**first_name**`, of type `varchar`. This field contains the first name of the stored user.
 25* `**last_name**`, of type `varchar`. This field contains the last name of the stored user.
 26* `**username**`, of type `varchar`. This field contains the username of the stored user.
 27Because Cassandra uses a [wide column storage model](https://cassandra.apache.org/doc/latest/architecture/overview.html), direct access through a key is the most efficient way to query the database. In our case, the key is the primary key of the `users` table, using the `id` column. The index for the primary key is ready to be used as soon as we create the table, so we don’t need to create it on our own.
 28
 29## Dependencies
 30
 31Because we will program it in Python, you need Python installed. You can install it using a package manager of your choice or heading over to the [Python downloads section](https://www.python.org/downloads/), but if you’re on Linux, chances are you have it installed already.
 32
 33Once Python 3.5 or above is installed, get a copy of the Cassandra driver for Python and Telethon through `pip`:
 34
 35```
 36pip install cassandra-driver telethon
 37```
 38
 39For more details on that, see the [installation guide for `cassandra-driver`](https://docs.datastax.com/en/developer/python-driver/3.22/installation/), or the [installation guide for `telethon`](https://docs.telethon.dev/en/latest/basic/installation.html).
 40
 41As we did in our [previous post](/blog/mdad/cassandra-operaciones-basicas-y-arquitectura/), we will setup a new keyspace for this application with `cqlsh`. We will also create a table to store the users into. This could all be automated in the Python code, but because it’s a one-time thing, we prefer to use `cqlsh`.
 42
 43Make sure that Cassandra is running in the background. We can’t make queries to it if it’s not running.
 44
 45```
 46$ bin/cqlsh
 47Connected to Test Cluster at 127.0.0.1:9042.
 48[cqlsh 5.0.1 | Cassandra 3.11.6 | CQL spec 3.4.4 | Native protocol v4]
 49Use HELP for help.
 50cqlsh> create keyspace mdad with replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
 51cqlsh> use mdad;
 52cqlsh:mdad> create table users(id int primary key, first_name varchar, last_name varchar, username varchar);
 53```
 54
 55Python installed? Check. Python dependencies? Check. Cassandra ready? Check.
 56
 57## The code
 58
 59### Getting users
 60
 61The first step is connecting to [Telegram’s API](https://core.telegram.org/api), for which we’ll use [Telethon](https://telethon.dev/), a wonderful (wink, wink) Python library to interface with it.
 62
 63As with most APIs, we need to supply [our API key](https://my.telegram.org/) in order to use it (here `API_ID` and `API_HASH`). We will refer to them as constants. At the end, you may download the entire code and use my own key for this example. But please don’t use those values for your other applications!
 64
 65It’s pretty simple: we create a client, and for every dialog (that is, open conversation) we have, do some checks:
 66
 67* If it’s an user, we just store that in a dictionary mapping `ID → User`.
 68* Else if it’s a group, we iterate over the participants and store those users instead.
 69
 70```
 71async def load_users():
 72    from telethon import TelegramClient
 73
 74    users = {}
 75
 76    async with TelegramClient(SESSION, API_ID, API_HASH) as client:
 77        async for dialog in client.iter_dialogs():
 78            if dialog.is_user:
 79                user = dialog.entity
 80                users[user.id] = user
 81                print('found user:', user.id, file=sys.stderr)
 82
 83            elif dialog.is_group:
 84                async for user in client.iter_participants(dialog):
 85                    users[user.id] = user
 86                    print('found member:', user.id, file=sys.stderr)
 87
 88    return list(users.values())
 89```
 90
 91With this we have a mapping ID to user, so we know we won’t have duplicates. We simply return the list of user values, because that’s all we care about.
 92
 93### Saving users
 94
 95Inserting users into Cassandra is pretty straightforward. We take the list of `User` objects as input, and prepare a new `INSERT` statement that we can reuse (because we will be using it in a loop, this is the best way to do it).
 96
 97For each user, execute the statement with the user data as input parameters. Simple as that.
 98
 99```
100def save_users(session, users):
101    insert_stmt = session.prepare(
102        'INSERT INTO users (id, first_name, last_name, username) '
103        'VALUES (?, ?, ?, ?)')
104
105    for user in users:
106        row = (user.id, user.first_name, user.last_name, user.username)
107        session.execute(insert_stmt, row)
108```
109
110### Fetching users
111
112Given a list of users, yield all of them from the database. Similar to before, we prepare a `SELECT` statement and just execute it repeatedly over the input user IDs.
113
114```
115def fetch_users(session, users):
116    select_stmt = session.prepare('SELECT * FROM users WHERE id = ?')
117
118    for user_id in users:
119        yield session.execute(select_stmt, (user_id,)).one()
120```
121
122### Parsing arguments
123
124We’ll be making a little CLI application, so we need to parse console arguments. It won’t be anything fancy, though. For that we’ll be using [Python’s `argparse` module](https://docs.python.org/3/library/argparse.html):
125
126```
127def parse_args():
128    import argparse
129
130    parser = argparse.ArgumentParser(
131        description='Dump and query Telegram users')
132
133    parser.add_argument('users', type=int, nargs='*',
134        help='one or more user IDs to query for')
135
136    parser.add_argument('--load-users', action='store_true',
137        help='load users from Telegram (do this first run)')
138
139    return parser.parse_args()
140```
141
142### All together
143
144Last, the entry point. We import a Cassandra Cluster, and connect to some default keyspace (we called it `mdad` earlier).
145
146If the user wants to load the users into the database, we’ll do just that first.
147
148Then, for each user we fetch from the database, we print it. Last names and usernames are optional, so don’t print those if they’re missing (`None`).
149
150```
151async def main(args):
152    from cassandra.cluster import Cluster
153
154    cluster = Cluster(CLUSTER_NODES)
155    session = cluster.connect(KEYSPACE)
156
157    if args.load_users:
158        users = await load_users()
159        save_users(session, users)
160
161    for user in fetch_users(session, args.users):
162        print('User', user.id, ':')
163        print('  First name:', user.first_name)
164        if user.last_name:
165            print('  Last name:', user.last_name)
166        if user.username:
167            print('  Username:', user.username)
168
169        print()
170
171if __name__ == '__main__':
172    asyncio.run(main(parse_args()))
173```
174
175Because Telethon is an `[asyncio](https://docs.python.org/3/library/asyncio.html)` library, we define it as `async def main(...)` and run it with `asyncio.run(main(...))`.
176
177Here’s what it looks like in action:
178
179```
180$ python data.py --help
181usage: data.py [-h] [--load-users] [users [users ...]]
182
183Dump and query Telegram users
184
185positional arguments:
186  users         one or more user IDs to query for
187
188optional arguments:
189  -h, --help    show this help message and exit
190  --load-users  load users from Telegram (do this first run)
191
192$ python data.py --load-users
193found user: 487158
194found member: 59794114
195found member: 487158
196found member: 191045991
197(...a lot more output)
198
199$ python data.py 487158 59794114
200User 487158 :
201  First name: Rick
202  Last name: Pickle
203
204User 59794114 :
205  Firt name: Peter
206  Username: pete
207```
208
209Telegram’s data now persists in Cassandra, and we can efficiently query it whenever we need to! I would’ve shown a video presenting its usage, but I’m afraid that would leak some of the data I want to keep private :-).
210
211Feel free to download the code and try it yourself:
212
213*download removed*
214
215## References
216
217* [DataStax Python Driver for Apache Cassandra – Getting Started](https://docs.datastax.com/en/developer/python-driver/3.22/getting_started/)
218* [Telethon’s Documentation](https://docs.telethon.dev/en/latest/)