Basic
This is an example that demonstrates the basic setup of a cache. It demonstrates:
how to create an instance of an existing
Indexesimplementationhow to create a
CachedTablebacked by atablecache.postgres.PostgresAccessand atablecache.local.LocalStorageTablehow to load records into it
how to fetch records
how to invalidate records that have changed in the underlying DB
how to manually trigger a refresh of invalid records
The source file can be found in the repository’s examples directory. To run
it, you need a running Postgres, which can be started by upping the
docker-compose.yml found in the same directory.
import asyncio
import logging
import operator as op
import asyncpg
import tablecache as tc
import tablecache.postgres as tcpg
import tablecache.local as tcl
POSTGRES_DSN = 'postgres://postgres:@localhost:5432/postgres'
# In this basic example, we have 2 Postgres tables that are joined together
# (see setup_example_db() below). We want to store the result of the join in a
# fast storage.
async def main():
# We define 2 query strings: one that returns the entire join, and one that
# filters for rows with specific primary keys. We give these query strings
# to a PrimaryKeyIndexes instance, which abstracts access to DB and
# storage. PrimaryKeyIndexes is a simple implementation of the abstract
# Indexes which is likely not useful in practice but good for
# demonstration. It allows us to load and query either everything, or
# specific primary keys. See the other examples for more useful indexes
# implementations.
query_all_string = '''
SELECT
uc.*, u.name AS user_name, u.nickname AS user_nickname,
c.name AS city_name
FROM
users u
JOIN users_cities uc USING (user_id)
JOIN cities c USING (city_id)'''
query_some_string = f'{query_all_string} WHERE uc.user_id = ANY ($1)'
indexes = tc.PrimaryKeyIndexes(
op.itemgetter('user_id'), query_all_string, query_some_string)
# We create a PostgresAccess (an implementation of the abstract DbAccess),
# a LocalStorageTable (an implementation of the abstract StorageTable which
# stores its data locally in Python data structures). Then we create a
# CachedTable with DB access, storage, and the indexes to tie them
# together. The table_name argument to LocalStorageTable is optional.
db_access = tcpg.PostgresAccess(dsn=POSTGRES_DSN)
storage_table = tcl.LocalStorageTable(indexes, table_name='users')
table = tc.CachedTable(indexes, db_access, storage_table)
async with db_access, asyncpg.create_pool(POSTGRES_DSN) as postgres_pool:
await setup_example_db(postgres_pool)
# We load data into the cache using the name of an index and arguments
# specific to it. The PrimaryKeyIndexes have only the primary_key
# index, and here we want to load everything. The arguments we pass
# here are actually directly converted into a
# PrimaryKeyIndexes.IndexSpec object, which we could also pass
# directly. The same holds for some other methods in the CachedTable.
await table.load('primary_key', all_primary_keys=True)
# Since we've awaited load(), the table is now ready to use and we can
# query it, hitting the cache rather than the DB. If we had started the
# load in a background task, we could have used CachedTable.loaded() to
# wait until it's ready (e.g. in a readiness check). Since we're
# already loaded, this will return immediately.
await table.loaded()
print('Users with primary key 1 and 2:')
async for user in table.get_records('primary_key', 1, 2):
# We're querying the cache for records with primary keys 1 and 2
# (the arguments are again turned into an IndexSpec which we could
# have passed directly). Note that these are asyncpg.Record
# objects. That's because the DB access returns those, and the
# local storage just adds them to its data structures as-is. If we
# wanted regular dicts, we could use the record_parser argument
# when creating DbRecordsSpecs in a custom Indexes implementation.
print(user)
try:
# In addition to the get_records() asynchronous generator, there is
# also a get_first_record() convenience method that takes the same
# arguments as get_records() would, and simply returns the first
# record. If there isn't one, as is the case here, it raises a
# KeyError. Since we loaded all records in the beginning, our
# PrimaryKeyIndexes know that if this key isn't found in storage,
# it's also not in the DB, so this doesn't cause the DB to be hit.
print('Trying to find user with primary key 3:')
await table.get_first_record('primary_key', 3)
except KeyError:
print('No user with primary key 3 exists.')
# When data in the DB changes, the cache will still return the old
# record. We need to invalidate it using invalidate_records(), which
# can get a bit complicated in the general case. But here we can simply
# pass an IndexSpec that gets the record we've updated. We need to pass
# this twice (each in a list with one element). After we've invalidated
# the record, the cache makes sure that the next time we query that
# primary key, it is fetched from the DB first.
await postgres_pool.execute(
'UPDATE users SET name = $1 WHERE user_id = $2', 'New Name', 1)
print('User 1 has changed in the DB, but not yet in cache:')
print(await table.get_first_record('primary_key', 1))
table.invalidate_records(
[indexes.IndexSpec('primary_key', 1)],
[indexes.IndexSpec('primary_key', 1)])
print('After invalidating, a refresh is triggered to update user 1:')
print(await table.get_first_record('primary_key', 1))
# We can also invalidate records that were added to the DB after the
# cache was loaded, which will cause them to be added to storage. This
# works in our case, but there is another way of loading new records
# (using adjust()) that may be preferable since it allows us to observe
# every new record that was added. See the other examples.
await postgres_pool.execute('''
INSERT INTO users(user_id, name) VALUES(3, 'User 3');
INSERT INTO users_cities(user_id, city_id)
SELECT 3, city_id FROM cities LIMIT 1''')
table.invalidate_records(
[indexes.IndexSpec('primary_key', 3)],
[indexes.IndexSpec('primary_key', 3)])
# Normally, a refresh is triggered only when a read request for invalid
# data comes in. But we can also trigger one manually to serve requests
# from storage immediately.
print('Manually triggering a refresh.')
await table.refresh_invalid()
print('Newly inserted user 3:')
print(await table.get_first_record('primary_key', 3))
async def setup_example_db(postgres_pool):
await postgres_pool.execute('''
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
CREATE TABLE users (
user_id integer PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
name text NOT NULL,
nickname text
);
CREATE TABLE cities (
city_id integer PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
name text NOT NULL
);
CREATE TABLE users_cities (
user_id integer PRIMARY KEY REFERENCES users(user_id),
city_id integer NOT NULL REFERENCES cities(city_id)
);
INSERT INTO users(user_id, name, nickname) VALUES(1, 'User 1', 'u1');
WITH c AS (INSERT INTO cities(name) VALUES('City 1') RETURNING city_id)
INSERT INTO users_cities(user_id, city_id)
SELECT 1, c.city_id FROM c;
INSERT INTO users(user_id, name) VALUES(2, 'User 2');
WITH c AS (INSERT INTO cities(name) VALUES('City 2') RETURNING city_id)
INSERT INTO users_cities(user_id, city_id)
SELECT 2, c.city_id FROM c;''')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(main())