Basic
This is an example that demonstrates the basic setup of a cache. It demonstrates:
how to create an instance of an existing
Indexes
implementationhow to create a
CachedTable
backed by atablecache.postgres.PostgresAccess
and atablecache.local.LocalStorageTable
how to load records into it
how to fetch records
how to invalidate records that have changed in the underlying DB
how to manually trigger a refresh of invalid records
The source file can be found in the repository’s examples
directory. To run
it, you need a running Postgres, which can be started by up
ping the
docker-compose.yml
found in the same directory.
import asyncio
import logging
import operator as op
import asyncpg
import tablecache as tc
import tablecache.postgres as tcpg
import tablecache.local as tcl
POSTGRES_DSN = 'postgres://postgres:@localhost:5432/postgres'
# In this basic example, we have 2 Postgres tables that are joined together
# (see setup_example_db() below). We want to store the result of the join in a
# fast storage.
async def main():
# We define 2 query strings: one that returns the entire join, and one that
# filters for rows with specific primary keys. We give these query strings
# to a PrimaryKeyIndexes instance, which abstracts access to DB and
# storage. PrimaryKeyIndexes is a simple implementation of the abstract
# Indexes which is likely not useful in practice but good for
# demonstration. It allows us to load and query either everything, or
# specific primary keys. See the other examples for more useful indexes
# implementations.
query_all_string = '''
SELECT
uc.*, u.name AS user_name, u.nickname AS user_nickname,
c.name AS city_name
FROM
users u
JOIN users_cities uc USING (user_id)
JOIN cities c USING (city_id)'''
query_some_string = f'{query_all_string} WHERE uc.user_id = ANY ($1)'
indexes = tc.PrimaryKeyIndexes(
op.itemgetter('user_id'), query_all_string, query_some_string)
# We create a PostgresAccess (an implementation of the abstract DbAccess),
# a LocalStorageTable (an implementation of the abstract StorageTable which
# stores its data locally in Python data structures). Then we create a
# CachedTable with DB access, storage, and the indexes to tie them
# together. The table_name argument to LocalStorageTable is optional.
db_access = tcpg.PostgresAccess(dsn=POSTGRES_DSN)
storage_table = tcl.LocalStorageTable(indexes, table_name='users')
table = tc.CachedTable(indexes, db_access, storage_table)
async with db_access, asyncpg.create_pool(POSTGRES_DSN) as postgres_pool:
await setup_example_db(postgres_pool)
# We load data into the cache using the name of an index and arguments
# specific to it. The PrimaryKeyIndexes have only the primary_key
# index, and here we want to load everything. The arguments we pass
# here are actually directly converted into a
# PrimaryKeyIndexes.IndexSpec object, which we could also pass
# directly. The same holds for some other methods in the CachedTable.
await table.load('primary_key', all_primary_keys=True)
# Since we've awaited load(), the table is now ready to use and we can
# query it, hitting the cache rather than the DB. If we had started the
# load in a background task, we could have used CachedTable.loaded() to
# wait until it's ready (e.g. in a readiness check). Since we're
# already loaded, this will return immediately.
await table.loaded()
print('Users with primary key 1 and 2:')
async for user in table.get_records('primary_key', 1, 2):
# We're querying the cache for records with primary keys 1 and 2
# (the arguments are again turned into an IndexSpec which we could
# have passed directly). Note that these are asyncpg.Record
# objects. That's because the DB access returns those, and the
# local storage just adds them to its data structures as-is. If we
# wanted regular dicts, we could use the record_parser argument
# when creating DbRecordsSpecs in a custom Indexes implementation.
print(user)
try:
# In addition to the get_records() asynchronous generator, there is
# also a get_first_record() convenience method that takes the same
# arguments as get_records() would, and simply returns the first
# record. If there isn't one, as is the case here, it raises a
# KeyError. Since we loaded all records in the beginning, our
# PrimaryKeyIndexes know that if this key isn't found in storage,
# it's also not in the DB, so this doesn't cause the DB to be hit.
print('Trying to find user with primary key 3:')
await table.get_first_record('primary_key', 3)
except KeyError:
print('No user with primary key 3 exists.')
# When data in the DB changes, the cache will still return the old
# record. We need to invalidate it using invalidate_records(), which
# can get a bit complicated in the general case. But here we can simply
# pass an IndexSpec that gets the record we've updated. We need to pass
# this twice (each in a list with one element). After we've invalidated
# the record, the cache makes sure that the next time we query that
# primary key, it is fetched from the DB first.
await postgres_pool.execute(
'UPDATE users SET name = $1 WHERE user_id = $2', 'New Name', 1)
print('User 1 has changed in the DB, but not yet in cache:')
print(await table.get_first_record('primary_key', 1))
table.invalidate_records(
[indexes.IndexSpec('primary_key', 1)],
[indexes.IndexSpec('primary_key', 1)])
print('After invalidating, a refresh is triggered to update user 1:')
print(await table.get_first_record('primary_key', 1))
# We can also invalidate records that were added to the DB after the
# cache was loaded, which will cause them to be added to storage. This
# works in our case, but there is another way of loading new records
# (using adjust()) that may be preferable since it allows us to observe
# every new record that was added. See the other examples.
await postgres_pool.execute('''
INSERT INTO users(user_id, name) VALUES(3, 'User 3');
INSERT INTO users_cities(user_id, city_id)
SELECT 3, city_id FROM cities LIMIT 1''')
table.invalidate_records(
[indexes.IndexSpec('primary_key', 3)],
[indexes.IndexSpec('primary_key', 3)])
# Normally, a refresh is triggered only when a read request for invalid
# data comes in. But we can also trigger one manually to serve requests
# from storage immediately.
print('Manually triggering a refresh.')
await table.refresh_invalid()
print('Newly inserted user 3:')
print(await table.get_first_record('primary_key', 3))
async def setup_example_db(postgres_pool):
await postgres_pool.execute('''
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
CREATE TABLE users (
user_id integer PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
name text NOT NULL,
nickname text
);
CREATE TABLE cities (
city_id integer PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
name text NOT NULL
);
CREATE TABLE users_cities (
user_id integer PRIMARY KEY REFERENCES users(user_id),
city_id integer NOT NULL REFERENCES cities(city_id)
);
INSERT INTO users(user_id, name, nickname) VALUES(1, 'User 1', 'u1');
WITH c AS (INSERT INTO cities(name) VALUES('City 1') RETURNING city_id)
INSERT INTO users_cities(user_id, city_id)
SELECT 1, c.city_id FROM c;
INSERT INTO users(user_id, name) VALUES(2, 'User 2');
WITH c AS (INSERT INTO cities(name) VALUES('City 2') RETURNING city_id)
INSERT INTO users_cities(user_id, city_id)
SELECT 2, c.city_id FROM c;''')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(main())