From 5d7a6ad2238981646b2ae7b4071d8715281d181a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Jan 2020 10:37:00 +0000 Subject: Allow streaming cache invalidate all to workers. (#6749) --- synapse/replication/tcp/streams/_base.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'synapse/replication/tcp') diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index e03e77199b..a8d568b14a 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -17,7 +17,9 @@ import itertools import logging from collections import namedtuple -from typing import Any +from typing import Any, List, Optional + +import attr logger = logging.getLogger(__name__) @@ -65,10 +67,24 @@ PushersStreamRow = namedtuple( "PushersStreamRow", ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool ) -CachesStreamRow = namedtuple( - "CachesStreamRow", - ("cache_func", "keys", "invalidation_ts"), # str # list(str) # int -) + + +@attr.s +class CachesStreamRow: + """Stream to inform workers they should invalidate their cache. + + Attributes: + cache_func: Name of the cached function. + keys: The entry in the cache to invalidate. If None then will + invalidate all. + invalidation_ts: Timestamp of when the invalidation took place. + """ + + cache_func = attr.ib(type=str) + keys = attr.ib(type=Optional[List[Any]]) + invalidation_ts = attr.ib(type=int) + + PublicRoomsStreamRow = namedtuple( "PublicRoomsStreamRow", ( -- cgit 1.4.1