diff options
author | Erik Johnston <erik@matrix.org> | 2020-01-22 10:37:00 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-22 10:37:00 +0000 |
commit | 5d7a6ad2238981646b2ae7b4071d8715281d181a (patch) | |
tree | 96eb244b6a5ed33ba95ce8df37d98313c88b80ef /synapse/replication | |
parent | Remove unused CI docker compose files (#6754) (diff) | |
download | synapse-5d7a6ad2238981646b2ae7b4071d8715281d181a.tar.xz |
Allow streaming cache invalidate all to workers. (#6749)
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/slave/storage/_base.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 26 |
2 files changed, 27 insertions, 6 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 704282c800..f45cbd37a0 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -66,11 +66,16 @@ class BaseSlavedStore(SQLBaseStore): self._cache_id_gen.advance(token) for row in rows: if row.cache_func == CURRENT_STATE_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for current state cache" + ) + room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) else: - self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys)) + self._attempt_to_invalidate_cache(row.cache_func, row.keys) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index e03e77199b..a8d568b14a 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -17,7 +17,9 @@ import itertools import logging from collections import namedtuple -from typing import Any +from typing import Any, List, Optional + +import attr logger = logging.getLogger(__name__) @@ -65,10 +67,24 @@ PushersStreamRow = namedtuple( "PushersStreamRow", ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool ) -CachesStreamRow = namedtuple( - "CachesStreamRow", - ("cache_func", "keys", "invalidation_ts"), # str # list(str) # int -) + + +@attr.s +class CachesStreamRow: + """Stream to inform workers they should invalidate their cache. + + Attributes: + cache_func: Name of the cached function. + keys: The entry in the cache to invalidate. If None then will + invalidate all. + invalidation_ts: Timestamp of when the invalidation took place. + """ + + cache_func = attr.ib(type=str) + keys = attr.ib(type=Optional[List[Any]]) + invalidation_ts = attr.ib(type=int) + + PublicRoomsStreamRow = namedtuple( "PublicRoomsStreamRow", ( |