diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index abf5c6c6a8..32d9514883 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -28,94 +28,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 500000
-BackfillStreamRow = namedtuple(
- "BackfillStreamRow",
- (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
- "relates_to", # str, optional
- ),
-)
-PresenceStreamRow = namedtuple(
- "PresenceStreamRow",
- (
- "user_id", # str
- "state", # str
- "last_active_ts", # int
- "last_federation_update_ts", # int
- "last_user_sync_ts", # int
- "status_msg", # str
- "currently_active", # bool
- ),
-)
-TypingStreamRow = namedtuple(
- "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
-)
-ReceiptsStreamRow = namedtuple(
- "ReceiptsStreamRow",
- (
- "room_id", # str
- "receipt_type", # str
- "user_id", # str
- "event_id", # str
- "data", # dict
- ),
-)
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
-PushersStreamRow = namedtuple(
- "PushersStreamRow",
- ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
-)
-
-
-@attr.s
-class CachesStreamRow:
- """Stream to inform workers they should invalidate their cache.
-
- Attributes:
- cache_func: Name of the cached function.
- keys: The entry in the cache to invalidate. If None then will
- invalidate all.
- invalidation_ts: Timestamp of when the invalidation took place.
- """
-
- cache_func = attr.ib(type=str)
- keys = attr.ib(type=Optional[List[Any]])
- invalidation_ts = attr.ib(type=int)
-
-
-PublicRoomsStreamRow = namedtuple(
- "PublicRoomsStreamRow",
- (
- "room_id", # str
- "visibility", # str
- "appservice_id", # str, optional
- "network_id", # str, optional
- ),
-)
-
-
-@attr.s
-class DeviceListsStreamRow:
- entity = attr.ib(type=str)
-
-
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
-TagAccountDataStreamRow = namedtuple(
- "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
-)
-AccountDataStreamRow = namedtuple(
- "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
-)
-GroupsStreamRow = namedtuple(
- "GroupsStreamRow",
- ("group_id", "user_id", "type", "content"), # str # str # str # dict
-)
-UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
-
class Stream(object):
"""Base class for the streams.
@@ -234,6 +146,18 @@ class BackfillStream(Stream):
or it went from being an outlier to not.
"""
+ BackfillStreamRow = namedtuple(
+ "BackfillStreamRow",
+ (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+ "relates_to", # str, optional
+ ),
+ )
+
NAME = "backfill"
ROW_TYPE = BackfillStreamRow
@@ -246,6 +170,19 @@ class BackfillStream(Stream):
class PresenceStream(Stream):
+ PresenceStreamRow = namedtuple(
+ "PresenceStreamRow",
+ (
+ "user_id", # str
+ "state", # str
+ "last_active_ts", # int
+ "last_federation_update_ts", # int
+ "last_user_sync_ts", # int
+ "status_msg", # str
+ "currently_active", # bool
+ ),
+ )
+
NAME = "presence"
ROW_TYPE = PresenceStreamRow
@@ -260,6 +197,10 @@ class PresenceStream(Stream):
class TypingStream(Stream):
+ TypingStreamRow = namedtuple(
+ "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
+ )
+
NAME = "typing"
ROW_TYPE = TypingStreamRow
@@ -273,6 +214,17 @@ class TypingStream(Stream):
class ReceiptsStream(Stream):
+ ReceiptsStreamRow = namedtuple(
+ "ReceiptsStreamRow",
+ (
+ "room_id", # str
+ "receipt_type", # str
+ "user_id", # str
+ "event_id", # str
+ "data", # dict
+ ),
+ )
+
NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow
@@ -289,6 +241,8 @@ class PushRulesStream(Stream):
"""A user has changed their push rules
"""
+ PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
+
NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow
@@ -309,6 +263,11 @@ class PushersStream(Stream):
"""A user has added/changed/removed a pusher
"""
+ PushersStreamRow = namedtuple(
+ "PushersStreamRow",
+ ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
+ )
+
NAME = "pushers"
ROW_TYPE = PushersStreamRow
@@ -326,6 +285,21 @@ class CachesStream(Stream):
the cache on the workers
"""
+ @attr.s
+ class CachesStreamRow:
+ """Stream to inform workers they should invalidate their cache.
+
+ Attributes:
+ cache_func: Name of the cached function.
+ keys: The entry in the cache to invalidate. If None then will
+ invalidate all.
+ invalidation_ts: Timestamp of when the invalidation took place.
+ """
+
+ cache_func = attr.ib(type=str)
+ keys = attr.ib(type=Optional[List[Any]])
+ invalidation_ts = attr.ib(type=int)
+
NAME = "caches"
ROW_TYPE = CachesStreamRow
@@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
"""The public rooms list changed
"""
+ PublicRoomsStreamRow = namedtuple(
+ "PublicRoomsStreamRow",
+ (
+ "room_id", # str
+ "visibility", # str
+ "appservice_id", # str, optional
+ "network_id", # str, optional
+ ),
+ )
+
NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow
@@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
told about a device update.
"""
+ @attr.s
+ class DeviceListsStreamRow:
+ entity = attr.ib(type=str)
+
NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow
@@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
"""New to_device messages for a client
"""
+ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
+
NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow
@@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
+ TagAccountDataStreamRow = namedtuple(
+ "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
+ )
+
NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow
@@ -407,6 +401,10 @@ class AccountDataStream(Stream):
"""Global or per room account data was changed
"""
+ AccountDataStreamRow = namedtuple(
+ "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
+ )
+
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
@@ -432,6 +430,11 @@ class AccountDataStream(Stream):
class GroupServerStream(Stream):
+ GroupsStreamRow = namedtuple(
+ "GroupsStreamRow",
+ ("group_id", "user_id", "type", "content"), # str # str # str # dict
+ )
+
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key
"""
+ UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
+
NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..f5f9336430 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -17,20 +17,20 @@ from collections import namedtuple
from ._base import Stream
-FederationStreamRow = namedtuple(
- "FederationStreamRow",
- (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
- ),
-)
-
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
+ FederationStreamRow = namedtuple(
+ "FederationStreamRow",
+ (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+ ),
+ )
+
NAME = "federation"
ROW_TYPE = FederationStreamRow
|