summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r--synapse/replication/tcp/streams/_base.py129
1 files changed, 67 insertions, 62 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 743a01da08..5a2d90c530 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -15,7 +15,6 @@
 
 import heapq
 import logging
-from collections import namedtuple
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -30,6 +29,7 @@ from typing import (
 import attr
 
 from synapse.replication.http.streams import ReplicationGetStreamUpdates
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -226,17 +226,14 @@ class BackfillStream(Stream):
     or it went from being an outlier to not.
     """
 
-    BackfillStreamRow = namedtuple(
-        "BackfillStreamRow",
-        (
-            "event_id",  # str
-            "room_id",  # str
-            "type",  # str
-            "state_key",  # str, optional
-            "redacts",  # str, optional
-            "relates_to",  # str, optional
-        ),
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class BackfillStreamRow:
+        event_id: str
+        room_id: str
+        type: str
+        state_key: Optional[str]
+        redacts: Optional[str]
+        relates_to: Optional[str]
 
     NAME = "backfill"
     ROW_TYPE = BackfillStreamRow
@@ -256,18 +253,15 @@ class BackfillStream(Stream):
 
 
 class PresenceStream(Stream):
-    PresenceStreamRow = namedtuple(
-        "PresenceStreamRow",
-        (
-            "user_id",  # str
-            "state",  # str
-            "last_active_ts",  # int
-            "last_federation_update_ts",  # int
-            "last_user_sync_ts",  # int
-            "status_msg",  # str
-            "currently_active",  # bool
-        ),
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class PresenceStreamRow:
+        user_id: str
+        state: str
+        last_active_ts: int
+        last_federation_update_ts: int
+        last_user_sync_ts: int
+        status_msg: str
+        currently_active: bool
 
     NAME = "presence"
     ROW_TYPE = PresenceStreamRow
@@ -302,7 +296,7 @@ class PresenceFederationStream(Stream):
     send.
     """
 
-    @attr.s(slots=True, auto_attribs=True)
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
     class PresenceFederationStreamRow:
         destination: str
         user_id: str
@@ -320,9 +314,10 @@ class PresenceFederationStream(Stream):
 
 
 class TypingStream(Stream):
-    TypingStreamRow = namedtuple(
-        "TypingStreamRow", ("room_id", "user_ids")  # str  # list(str)
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class TypingStreamRow:
+        room_id: str
+        user_ids: List[str]
 
     NAME = "typing"
     ROW_TYPE = TypingStreamRow
@@ -348,16 +343,13 @@ class TypingStream(Stream):
 
 
 class ReceiptsStream(Stream):
-    ReceiptsStreamRow = namedtuple(
-        "ReceiptsStreamRow",
-        (
-            "room_id",  # str
-            "receipt_type",  # str
-            "user_id",  # str
-            "event_id",  # str
-            "data",  # dict
-        ),
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class ReceiptsStreamRow:
+        room_id: str
+        receipt_type: str
+        user_id: str
+        event_id: str
+        data: dict
 
     NAME = "receipts"
     ROW_TYPE = ReceiptsStreamRow
@@ -374,7 +366,9 @@ class ReceiptsStream(Stream):
 class PushRulesStream(Stream):
     """A user has changed their push rules"""
 
-    PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))  # str
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class PushRulesStreamRow:
+        user_id: str
 
     NAME = "push_rules"
     ROW_TYPE = PushRulesStreamRow
@@ -396,10 +390,12 @@ class PushRulesStream(Stream):
 class PushersStream(Stream):
     """A user has added/changed/removed a pusher"""
 
-    PushersStreamRow = namedtuple(
-        "PushersStreamRow",
-        ("user_id", "app_id", "pushkey", "deleted"),  # str  # str  # str  # bool
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class PushersStreamRow:
+        user_id: str
+        app_id: str
+        pushkey: str
+        deleted: bool
 
     NAME = "pushers"
     ROW_TYPE = PushersStreamRow
@@ -419,7 +415,7 @@ class CachesStream(Stream):
     the cache on the workers
     """
 
-    @attr.s(slots=True)
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
     class CachesStreamRow:
         """Stream to inform workers they should invalidate their cache.
 
@@ -430,9 +426,9 @@ class CachesStream(Stream):
             invalidation_ts: Timestamp of when the invalidation took place.
         """
 
-        cache_func = attr.ib(type=str)
-        keys = attr.ib(type=Optional[List[Any]])
-        invalidation_ts = attr.ib(type=int)
+        cache_func: str
+        keys: Optional[List[Any]]
+        invalidation_ts: int
 
     NAME = "caches"
     ROW_TYPE = CachesStreamRow
@@ -451,9 +447,9 @@ class DeviceListsStream(Stream):
     told about a device update.
     """
 
-    @attr.s(slots=True)
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
     class DeviceListsStreamRow:
-        entity = attr.ib(type=str)
+        entity: str
 
     NAME = "device_lists"
     ROW_TYPE = DeviceListsStreamRow
@@ -470,7 +466,9 @@ class DeviceListsStream(Stream):
 class ToDeviceStream(Stream):
     """New to_device messages for a client"""
 
-    ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))  # str
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class ToDeviceStreamRow:
+        entity: str
 
     NAME = "to_device"
     ROW_TYPE = ToDeviceStreamRow
@@ -487,9 +485,11 @@ class ToDeviceStream(Stream):
 class TagAccountDataStream(Stream):
     """Someone added/removed a tag for a room"""
 
-    TagAccountDataStreamRow = namedtuple(
-        "TagAccountDataStreamRow", ("user_id", "room_id", "data")  # str  # str  # dict
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class TagAccountDataStreamRow:
+        user_id: str
+        room_id: str
+        data: JsonDict
 
     NAME = "tag_account_data"
     ROW_TYPE = TagAccountDataStreamRow
@@ -506,10 +506,11 @@ class TagAccountDataStream(Stream):
 class AccountDataStream(Stream):
     """Global or per room account data was changed"""
 
-    AccountDataStreamRow = namedtuple(
-        "AccountDataStreamRow",
-        ("user_id", "room_id", "data_type"),  # str  # Optional[str]  # str
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class AccountDataStreamRow:
+        user_id: str
+        room_id: Optional[str]
+        data_type: str
 
     NAME = "account_data"
     ROW_TYPE = AccountDataStreamRow
@@ -573,10 +574,12 @@ class AccountDataStream(Stream):
 
 
 class GroupServerStream(Stream):
-    GroupsStreamRow = namedtuple(
-        "GroupsStreamRow",
-        ("group_id", "user_id", "type", "content"),  # str  # str  # str  # dict
-    )
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class GroupsStreamRow:
+        group_id: str
+        user_id: str
+        type: str
+        content: JsonDict
 
     NAME = "groups"
     ROW_TYPE = GroupsStreamRow
@@ -593,7 +596,9 @@ class GroupServerStream(Stream):
 class UserSignatureStream(Stream):
     """A user has signed their own device with their user-signing key"""
 
-    UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id"))  # str
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class UserSignatureStreamRow:
+        user_id: str
 
     NAME = "user_signature"
     ROW_TYPE = UserSignatureStreamRow