summary refs log tree commit diff
path: root/synapse/replication/tcp/streams
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-22 10:37:00 +0000
committerGitHub <noreply@github.com>2020-01-22 10:37:00 +0000
commit5d7a6ad2238981646b2ae7b4071d8715281d181a (patch)
tree96eb244b6a5ed33ba95ce8df37d98313c88b80ef /synapse/replication/tcp/streams
parentRemove unused CI docker compose files (#6754) (diff)
downloadsynapse-5d7a6ad2238981646b2ae7b4071d8715281d181a.tar.xz
Allow streaming cache invalidate all to workers. (#6749)
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r--synapse/replication/tcp/streams/_base.py26
1 files changed, 21 insertions, 5 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index e03e77199b..a8d568b14a 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -17,7 +17,9 @@
 import itertools
 import logging
 from collections import namedtuple
-from typing import Any
+from typing import Any, List, Optional
+
+import attr
 
 logger = logging.getLogger(__name__)
 
@@ -65,10 +67,24 @@ PushersStreamRow = namedtuple(
     "PushersStreamRow",
     ("user_id", "app_id", "pushkey", "deleted"),  # str  # str  # str  # bool
 )
-CachesStreamRow = namedtuple(
-    "CachesStreamRow",
-    ("cache_func", "keys", "invalidation_ts"),  # str  # list(str)  # int
-)
+
+
+@attr.s
+class CachesStreamRow:
+    """Stream to inform workers they should invalidate their cache.
+
+    Attributes:
+        cache_func: Name of the cached function.
+        keys: The entry in the cache to invalidate. If None then will
+            invalidate all.
+        invalidation_ts: Timestamp of when the invalidation took place.
+    """
+
+    cache_func = attr.ib(type=str)
+    keys = attr.ib(type=Optional[List[Any]])
+    invalidation_ts = attr.ib(type=int)
+
+
 PublicRoomsStreamRow = namedtuple(
     "PublicRoomsStreamRow",
     (