From 4d70d1f80ea688304abdcbbf3ee01f6ab932abc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 10:21:25 +0100 Subject: Add some invalidations to a cache_stream --- synapse/storage/schema/delta/34/cache_stream.py | 44 +++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 synapse/storage/schema/delta/34/cache_stream.py (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py new file mode 100644 index 0000000000..4c350bfb11 --- /dev/null +++ b/synapse/storage/schema/delta/34/cache_stream.py @@ -0,0 +1,44 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +CREATE_TABLE = """ +CREATE TABLE cache_stream ( + stream_id BIGINT, + cache_func TEXT, + keys TEXT[], + invalidation_ts BIGINT +); + +CREATE INDEX cache_stream_id ON cache_stream(stream_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + return + + for statement in get_statements(CREATE_TABLE.splitlines()): + cur.execute(statement) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From d9664344ecf481a73b2189f5bc65f4f784222969 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 11:45:57 +0100 Subject: Rename table. Add docs. --- synapse/replication/slave/storage/_base.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 12 ++++++++++-- synapse/storage/schema/delta/34/cache_stream.py | 6 ++++-- 4 files changed, 16 insertions(+), 6 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 24c9946d6a..d839d169ab 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -29,7 +29,7 @@ class BaseSlavedStore(SQLBaseStore): super(BaseSlavedStore, self).__init__(hs) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = SlavedIdTracker( - db_conn, "cache_stream", "stream_id", + db_conn, "cache_invalidation_stream", "stream_id", ) else: self._cache_id_gen = None diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8af492b69f..7efc5bfeef 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -126,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore, if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( - db_conn, "cache_stream", "stream_id", + db_conn, "cache_invalidation_stream", "stream_id", ) else: self._cache_id_gen = None diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e3edc2cde6..c55776994d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -863,6 +863,13 @@ class SQLBaseStore(object): return cache, min_val def _invalidate_cache_and_stream(self, txn, cache_func, keys): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ txn.call_after(cache_func.invalidate, keys) if isinstance(self.database_engine, PostgresEngine): @@ -872,7 +879,7 @@ class SQLBaseStore(object): self._simple_insert_txn( txn, - table="cache_stream", + table="cache_invalidation_stream", values={ "stream_id": stream_id, "cache_func": cache_func.__name__, @@ -887,7 +894,8 @@ class SQLBaseStore(object): # send across cache invalidations as quickly as possible. Cache # invalidations are idempotent, so duplicates are fine. sql = ( - "SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream" + "SELECT stream_id, cache_func, keys, invalidation_ts" + " FROM cache_invalidation_stream" " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" ) txn.execute(sql, (last_id, limit,)) diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py index 4c350bfb11..3b63a1562d 100644 --- a/synapse/storage/schema/delta/34/cache_stream.py +++ b/synapse/storage/schema/delta/34/cache_stream.py @@ -20,15 +20,17 @@ import logging logger = logging.getLogger(__name__) +# This stream is used to notify replication slaves that some caches have +# been invalidated that they cannot infer from the other streams. CREATE_TABLE = """ -CREATE TABLE cache_stream ( +CREATE TABLE cache_invalidation_stream ( stream_id BIGINT, cache_func TEXT, keys TEXT[], invalidation_ts BIGINT ); -CREATE INDEX cache_stream_id ON cache_stream(stream_id); +CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id); """ -- cgit 1.4.1