summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r--synapse/replication/tcp/streams/_base.py160
1 files changed, 82 insertions, 78 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index b6ce7a7bee..7ef67a5a73 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -26,78 +26,75 @@ logger = logging.getLogger(__name__)
 
 MAX_EVENTS_BEHIND = 10000
 
-BackfillStreamRow = namedtuple("BackfillStreamRow", (
-    "event_id",  # str
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str, optional
-    "redacts",  # str, optional
-    "relates_to",  # str, optional
-))
-PresenceStreamRow = namedtuple("PresenceStreamRow", (
-    "user_id",  # str
-    "state",  # str
-    "last_active_ts",  # int
-    "last_federation_update_ts",  # int
-    "last_user_sync_ts",  # int
-    "status_msg",   # str
-    "currently_active",  # bool
-))
-TypingStreamRow = namedtuple("TypingStreamRow", (
-    "room_id",  # str
-    "user_ids",  # list(str)
-))
-ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
-    "room_id",  # str
-    "receipt_type",  # str
-    "user_id",  # str
-    "event_id",  # str
-    "data",  # dict
-))
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
-    "user_id",  # str
-))
-PushersStreamRow = namedtuple("PushersStreamRow", (
-    "user_id",  # str
-    "app_id",  # str
-    "pushkey",  # str
-    "deleted",  # bool
-))
-CachesStreamRow = namedtuple("CachesStreamRow", (
-    "cache_func",  # str
-    "keys",  # list(str)
-    "invalidation_ts",  # int
-))
-PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
-    "room_id",  # str
-    "visibility",  # str
-    "appservice_id",  # str, optional
-    "network_id",  # str, optional
-))
-DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
-    "user_id",  # str
-    "destination",  # str
-))
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
-    "entity",  # str
-))
-TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
-    "user_id",  # str
-    "room_id",  # str
-    "data",  # dict
-))
-AccountDataStreamRow = namedtuple("AccountDataStream", (
-    "user_id",  # str
-    "room_id",  # str
-    "data_type",  # str
-    "data",  # dict
-))
-GroupsStreamRow = namedtuple("GroupsStreamRow", (
-    "group_id",  # str
-    "user_id",  # str
-    "type",  # str
-    "content",  # dict
-))
+BackfillStreamRow = namedtuple(
+    "BackfillStreamRow",
+    (
+        "event_id",  # str
+        "room_id",  # str
+        "type",  # str
+        "state_key",  # str, optional
+        "redacts",  # str, optional
+        "relates_to",  # str, optional
+    ),
+)
+PresenceStreamRow = namedtuple(
+    "PresenceStreamRow",
+    (
+        "user_id",  # str
+        "state",  # str
+        "last_active_ts",  # int
+        "last_federation_update_ts",  # int
+        "last_user_sync_ts",  # int
+        "status_msg",  # str
+        "currently_active",  # bool
+    ),
+)
+TypingStreamRow = namedtuple(
+    "TypingStreamRow", ("room_id", "user_ids")  # str  # list(str)
+)
+ReceiptsStreamRow = namedtuple(
+    "ReceiptsStreamRow",
+    (
+        "room_id",  # str
+        "receipt_type",  # str
+        "user_id",  # str
+        "event_id",  # str
+        "data",  # dict
+    ),
+)
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))  # str
+PushersStreamRow = namedtuple(
+    "PushersStreamRow",
+    ("user_id", "app_id", "pushkey", "deleted"),  # str  # str  # str  # bool
+)
+CachesStreamRow = namedtuple(
+    "CachesStreamRow",
+    ("cache_func", "keys", "invalidation_ts"),  # str  # list(str)  # int
+)
+PublicRoomsStreamRow = namedtuple(
+    "PublicRoomsStreamRow",
+    (
+        "room_id",  # str
+        "visibility",  # str
+        "appservice_id",  # str, optional
+        "network_id",  # str, optional
+    ),
+)
+DeviceListsStreamRow = namedtuple(
+    "DeviceListsStreamRow", ("user_id", "destination")  # str  # str
+)
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))  # str
+TagAccountDataStreamRow = namedtuple(
+    "TagAccountDataStreamRow", ("user_id", "room_id", "data")  # str  # str  # dict
+)
+AccountDataStreamRow = namedtuple(
+    "AccountDataStream",
+    ("user_id", "room_id", "data_type", "data"),  # str  # str  # str  # dict
+)
+GroupsStreamRow = namedtuple(
+    "GroupsStreamRow",
+    ("group_id", "user_id", "type", "content"),  # str  # str  # str  # dict
+)
 
 
 class Stream(object):
@@ -106,6 +103,7 @@ class Stream(object):
     Provides a `get_updates()` function that returns new updates since the last
     time it was called up until the point `advance_current_token` was called.
     """
+
     NAME = None  # The name of the stream
     ROW_TYPE = None  # The type of the row. Used by the default impl of parse_row.
     _LIMITED = True  # Whether the update function takes a limit
@@ -185,16 +183,13 @@ class Stream(object):
 
         if self._LIMITED:
             rows = yield self.update_function(
-                from_token, current_token,
-                limit=MAX_EVENTS_BEHIND + 1,
+                from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
             )
 
             # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
             rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
         else:
-            rows = yield self.update_function(
-                from_token, current_token,
-            )
+            rows = yield self.update_function(from_token, current_token)
 
         updates = [(row[0], row[1:]) for row in rows]
 
@@ -230,6 +225,7 @@ class BackfillStream(Stream):
     """We fetched some old events and either we had never seen that event before
     or it went from being an outlier to not.
     """
+
     NAME = "backfill"
     ROW_TYPE = BackfillStreamRow
 
@@ -286,6 +282,7 @@ class ReceiptsStream(Stream):
 class PushRulesStream(Stream):
     """A user has changed their push rules
     """
+
     NAME = "push_rules"
     ROW_TYPE = PushRulesStreamRow
 
@@ -306,6 +303,7 @@ class PushRulesStream(Stream):
 class PushersStream(Stream):
     """A user has added/changed/removed a pusher
     """
+
     NAME = "pushers"
     ROW_TYPE = PushersStreamRow
 
@@ -322,6 +320,7 @@ class CachesStream(Stream):
     """A cache was invalidated on the master and no other stream would invalidate
     the cache on the workers
     """
+
     NAME = "caches"
     ROW_TYPE = CachesStreamRow
 
@@ -337,6 +336,7 @@ class CachesStream(Stream):
 class PublicRoomsStream(Stream):
     """The public rooms list changed
     """
+
     NAME = "public_rooms"
     ROW_TYPE = PublicRoomsStreamRow
 
@@ -352,6 +352,7 @@ class PublicRoomsStream(Stream):
 class DeviceListsStream(Stream):
     """Someone added/changed/removed a device
     """
+
     NAME = "device_lists"
     _LIMITED = False
     ROW_TYPE = DeviceListsStreamRow
@@ -368,6 +369,7 @@ class DeviceListsStream(Stream):
 class ToDeviceStream(Stream):
     """New to_device messages for a client
     """
+
     NAME = "to_device"
     ROW_TYPE = ToDeviceStreamRow
 
@@ -383,6 +385,7 @@ class ToDeviceStream(Stream):
 class TagAccountDataStream(Stream):
     """Someone added/removed a tag for a room
     """
+
     NAME = "tag_account_data"
     ROW_TYPE = TagAccountDataStreamRow
 
@@ -398,6 +401,7 @@ class TagAccountDataStream(Stream):
 class AccountDataStream(Stream):
     """Global or per room account data was changed
     """
+
     NAME = "account_data"
     ROW_TYPE = AccountDataStreamRow
 
@@ -416,7 +420,7 @@ class AccountDataStream(Stream):
 
         results = list(room_results)
         results.extend(
-            (stream_id, user_id, None, account_data_type, content,)
+            (stream_id, user_id, None, account_data_type, content)
             for stream_id, user_id, account_data_type, content in global_results
         )