diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 4de4ebe84d..967b459e0e 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -36,34 +36,82 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
-EventStreamRow = namedtuple("EventStreamRow",
- ("event_id", "room_id", "type", "state_key", "redacts"))
-BackfillStreamRow = namedtuple("BackfillStreamRow",
- ("event_id", "room_id", "type", "state_key", "redacts"))
-PresenceStreamRow = namedtuple("PresenceStreamRow",
- ("user_id", "state", "last_active_ts",
- "last_federation_update_ts", "last_user_sync_ts",
- "status_msg", "currently_active"))
-TypingStreamRow = namedtuple("TypingStreamRow",
- ("room_id", "user_ids"))
-ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
- ("room_id", "receipt_type", "user_id", "event_id",
- "data"))
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
-PushersStreamRow = namedtuple("PushersStreamRow",
- ("user_id", "app_id", "pushkey", "deleted",))
-CachesStreamRow = namedtuple("CachesStreamRow",
- ("cache_func", "keys", "invalidation_ts",))
-PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
- ("room_id", "visibility", "appservice_id",
- "network_id",))
-DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
-FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
-TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
- ("user_id", "room_id", "data"))
-AccountDataStreamRow = namedtuple("AccountDataStream",
- ("user_id", "room_id", "data_type", "data"))
+EventStreamRow = namedtuple("EventStreamRow", (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+))
+BackfillStreamRow = namedtuple("BackfillStreamRow", (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+))
+PresenceStreamRow = namedtuple("PresenceStreamRow", (
+ "user_id", # str
+ "state", # str
+ "last_active_ts", # int
+ "last_federation_update_ts", # int
+ "last_user_sync_ts", # int
+ "status_msg", # str
+ "currently_active", # bool
+))
+TypingStreamRow = namedtuple("TypingStreamRow", (
+ "room_id", # str
+ "user_ids", # list(str)
+))
+ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
+ "room_id", # str
+ "receipt_type", # str
+ "user_id", # str
+ "event_id", # str
+ "data", # dict
+))
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
+ "user_id", # str
+))
+PushersStreamRow = namedtuple("PushersStreamRow", (
+ "user_id", # str
+ "app_id", # str
+ "pushkey", # str
+ "deleted", # bool
+))
+CachesStreamRow = namedtuple("CachesStreamRow", (
+ "cache_func", # str
+ "keys", # list(str)
+ "invalidation_ts", # int
+))
+PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
+ "room_id", # str
+ "visibility", # str
+ "appservice_id", # str, optional
+ "network_id", # str, optional
+))
+DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
+ "user_id", # str
+ "destination", # str
+))
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
+ "entity", # str
+))
+FederationStreamRow = namedtuple("FederationStreamRow", (
+ "type", # str
+ "data", # dict
+))
+TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
+ "user_id", # str
+ "room_id", # str
+ "data", # dict
+))
+AccountDataStreamRow = namedtuple("AccountDataStream", (
+ "user_id", # str
+ "room_id", # str
+ "data_type", # str
+ "data", # dict
+))
class Stream(object):
|