summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-04-07 09:39:52 +0100
committerGitHub <noreply@github.com>2017-04-07 09:39:52 +0100
commit98ce212093bbe87c8cd2b5c92ee1587aba5cc1fa (patch)
treea6d9b00000e962c0a7e87bfe3d4a498f3f825641 /synapse/replication
parentUse iteritems (diff)
parentDocument types of the replication streams (diff)
downloadsynapse-98ce212093bbe87c8cd2b5c92ee1587aba5cc1fa.tar.xz
Merge pull request #2103 from matrix-org/erikj/no-double-encode
Don't double encode replication data
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/streams.py104
1 files changed, 76 insertions, 28 deletions
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 4de4ebe84d..967b459e0e 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -36,34 +36,82 @@ logger = logging.getLogger(__name__)
 MAX_EVENTS_BEHIND = 10000
 
 
-EventStreamRow = namedtuple("EventStreamRow",
-                            ("event_id", "room_id", "type", "state_key", "redacts"))
-BackfillStreamRow = namedtuple("BackfillStreamRow",
-                               ("event_id", "room_id", "type", "state_key", "redacts"))
-PresenceStreamRow = namedtuple("PresenceStreamRow",
-                               ("user_id", "state", "last_active_ts",
-                                "last_federation_update_ts", "last_user_sync_ts",
-                                "status_msg", "currently_active"))
-TypingStreamRow = namedtuple("TypingStreamRow",
-                             ("room_id", "user_ids"))
-ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
-                               ("room_id", "receipt_type", "user_id", "event_id",
-                                "data"))
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
-PushersStreamRow = namedtuple("PushersStreamRow",
-                              ("user_id", "app_id", "pushkey", "deleted",))
-CachesStreamRow = namedtuple("CachesStreamRow",
-                             ("cache_func", "keys", "invalidation_ts",))
-PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
-                                  ("room_id", "visibility", "appservice_id",
-                                   "network_id",))
-DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
-FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
-TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
-                                     ("user_id", "room_id", "data"))
-AccountDataStreamRow = namedtuple("AccountDataStream",
-                                  ("user_id", "room_id", "data_type", "data"))
+EventStreamRow = namedtuple("EventStreamRow", (
+    "event_id",  # str
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str, optional
+    "redacts",  # str, optional
+))
+BackfillStreamRow = namedtuple("BackfillStreamRow", (
+    "event_id",  # str
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str, optional
+    "redacts",  # str, optional
+))
+PresenceStreamRow = namedtuple("PresenceStreamRow", (
+    "user_id",  # str
+    "state",  # str
+    "last_active_ts",  # int
+    "last_federation_update_ts",  # int
+    "last_user_sync_ts",  # int
+    "status_msg",   # str
+    "currently_active",  # bool
+))
+TypingStreamRow = namedtuple("TypingStreamRow", (
+    "room_id",  # str
+    "user_ids",  # list(str)
+))
+ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
+    "room_id",  # str
+    "receipt_type",  # str
+    "user_id",  # str
+    "event_id",  # str
+    "data",  # dict
+))
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
+    "user_id",  # str
+))
+PushersStreamRow = namedtuple("PushersStreamRow", (
+    "user_id",  # str
+    "app_id",  # str
+    "pushkey",  # str
+    "deleted",  # bool
+))
+CachesStreamRow = namedtuple("CachesStreamRow", (
+    "cache_func",  # str
+    "keys",  # list(str)
+    "invalidation_ts",  # int
+))
+PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
+    "room_id",  # str
+    "visibility",  # str
+    "appservice_id",  # str, optional
+    "network_id",  # str, optional
+))
+DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
+    "user_id",  # str
+    "destination",  # str
+))
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
+    "entity",  # str
+))
+FederationStreamRow = namedtuple("FederationStreamRow", (
+    "type",  # str
+    "data",  # dict
+))
+TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
+    "user_id",  # str
+    "room_id",  # str
+    "data",  # dict
+))
+AccountDataStreamRow = namedtuple("AccountDataStream", (
+    "user_id",  # str
+    "room_id",  # str
+    "data_type",  # str
+    "data",  # dict
+))
 
 
 class Stream(object):