summary refs log tree commit diff
path: root/synapse/replication/tcp/streams
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r--synapse/replication/tcp/streams/_base.py160
-rw-r--r--synapse/replication/tcp/streams/events.py32
-rw-r--r--synapse/replication/tcp/streams/federation.py12
3 files changed, 104 insertions, 100 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index b6ce7a7bee..7ef67a5a73 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -26,78 +26,75 @@ logger = logging.getLogger(__name__)
 
 MAX_EVENTS_BEHIND = 10000
 
-BackfillStreamRow = namedtuple("BackfillStreamRow", (
-    "event_id",  # str
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str, optional
-    "redacts",  # str, optional
-    "relates_to",  # str, optional
-))
-PresenceStreamRow = namedtuple("PresenceStreamRow", (
-    "user_id",  # str
-    "state",  # str
-    "last_active_ts",  # int
-    "last_federation_update_ts",  # int
-    "last_user_sync_ts",  # int
-    "status_msg",   # str
-    "currently_active",  # bool
-))
-TypingStreamRow = namedtuple("TypingStreamRow", (
-    "room_id",  # str
-    "user_ids",  # list(str)
-))
-ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
-    "room_id",  # str
-    "receipt_type",  # str
-    "user_id",  # str
-    "event_id",  # str
-    "data",  # dict
-))
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
-    "user_id",  # str
-))
-PushersStreamRow = namedtuple("PushersStreamRow", (
-    "user_id",  # str
-    "app_id",  # str
-    "pushkey",  # str
-    "deleted",  # bool
-))
-CachesStreamRow = namedtuple("CachesStreamRow", (
-    "cache_func",  # str
-    "keys",  # list(str)
-    "invalidation_ts",  # int
-))
-PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
-    "room_id",  # str
-    "visibility",  # str
-    "appservice_id",  # str, optional
-    "network_id",  # str, optional
-))
-DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
-    "user_id",  # str
-    "destination",  # str
-))
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
-    "entity",  # str
-))
-TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
-    "user_id",  # str
-    "room_id",  # str
-    "data",  # dict
-))
-AccountDataStreamRow = namedtuple("AccountDataStream", (
-    "user_id",  # str
-    "room_id",  # str
-    "data_type",  # str
-    "data",  # dict
-))
-GroupsStreamRow = namedtuple("GroupsStreamRow", (
-    "group_id",  # str
-    "user_id",  # str
-    "type",  # str
-    "content",  # dict
-))
+BackfillStreamRow = namedtuple(
+    "BackfillStreamRow",
+    (
+        "event_id",  # str
+        "room_id",  # str
+        "type",  # str
+        "state_key",  # str, optional
+        "redacts",  # str, optional
+        "relates_to",  # str, optional
+    ),
+)
+PresenceStreamRow = namedtuple(
+    "PresenceStreamRow",
+    (
+        "user_id",  # str
+        "state",  # str
+        "last_active_ts",  # int
+        "last_federation_update_ts",  # int
+        "last_user_sync_ts",  # int
+        "status_msg",  # str
+        "currently_active",  # bool
+    ),
+)
+TypingStreamRow = namedtuple(
+    "TypingStreamRow", ("room_id", "user_ids")  # str  # list(str)
+)
+ReceiptsStreamRow = namedtuple(
+    "ReceiptsStreamRow",
+    (
+        "room_id",  # str
+        "receipt_type",  # str
+        "user_id",  # str
+        "event_id",  # str
+        "data",  # dict
+    ),
+)
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))  # str
+PushersStreamRow = namedtuple(
+    "PushersStreamRow",
+    ("user_id", "app_id", "pushkey", "deleted"),  # str  # str  # str  # bool
+)
+CachesStreamRow = namedtuple(
+    "CachesStreamRow",
+    ("cache_func", "keys", "invalidation_ts"),  # str  # list(str)  # int
+)
+PublicRoomsStreamRow = namedtuple(
+    "PublicRoomsStreamRow",
+    (
+        "room_id",  # str
+        "visibility",  # str
+        "appservice_id",  # str, optional
+        "network_id",  # str, optional
+    ),
+)
+DeviceListsStreamRow = namedtuple(
+    "DeviceListsStreamRow", ("user_id", "destination")  # str  # str
+)
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))  # str
+TagAccountDataStreamRow = namedtuple(
+    "TagAccountDataStreamRow", ("user_id", "room_id", "data")  # str  # str  # dict
+)
+AccountDataStreamRow = namedtuple(
+    "AccountDataStream",
+    ("user_id", "room_id", "data_type", "data"),  # str  # str  # str  # dict
+)
+GroupsStreamRow = namedtuple(
+    "GroupsStreamRow",
+    ("group_id", "user_id", "type", "content"),  # str  # str  # str  # dict
+)
 
 
 class Stream(object):
@@ -106,6 +103,7 @@ class Stream(object):
     Provides a `get_updates()` function that returns new updates since the last
     time it was called up until the point `advance_current_token` was called.
     """
+
     NAME = None  # The name of the stream
     ROW_TYPE = None  # The type of the row. Used by the default impl of parse_row.
     _LIMITED = True  # Whether the update function takes a limit
@@ -185,16 +183,13 @@ class Stream(object):
 
         if self._LIMITED:
             rows = yield self.update_function(
-                from_token, current_token,
-                limit=MAX_EVENTS_BEHIND + 1,
+                from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
             )
 
             # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
             rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
         else:
-            rows = yield self.update_function(
-                from_token, current_token,
-            )
+            rows = yield self.update_function(from_token, current_token)
 
         updates = [(row[0], row[1:]) for row in rows]
 
@@ -230,6 +225,7 @@ class BackfillStream(Stream):
     """We fetched some old events and either we had never seen that event before
     or it went from being an outlier to not.
     """
+
     NAME = "backfill"
     ROW_TYPE = BackfillStreamRow
 
@@ -286,6 +282,7 @@ class ReceiptsStream(Stream):
 class PushRulesStream(Stream):
     """A user has changed their push rules
     """
+
     NAME = "push_rules"
     ROW_TYPE = PushRulesStreamRow
 
@@ -306,6 +303,7 @@ class PushRulesStream(Stream):
 class PushersStream(Stream):
     """A user has added/changed/removed a pusher
     """
+
     NAME = "pushers"
     ROW_TYPE = PushersStreamRow
 
@@ -322,6 +320,7 @@ class CachesStream(Stream):
     """A cache was invalidated on the master and no other stream would invalidate
     the cache on the workers
     """
+
     NAME = "caches"
     ROW_TYPE = CachesStreamRow
 
@@ -337,6 +336,7 @@ class CachesStream(Stream):
 class PublicRoomsStream(Stream):
     """The public rooms list changed
     """
+
     NAME = "public_rooms"
     ROW_TYPE = PublicRoomsStreamRow
 
@@ -352,6 +352,7 @@ class PublicRoomsStream(Stream):
 class DeviceListsStream(Stream):
     """Someone added/changed/removed a device
     """
+
     NAME = "device_lists"
     _LIMITED = False
     ROW_TYPE = DeviceListsStreamRow
@@ -368,6 +369,7 @@ class DeviceListsStream(Stream):
 class ToDeviceStream(Stream):
     """New to_device messages for a client
     """
+
     NAME = "to_device"
     ROW_TYPE = ToDeviceStreamRow
 
@@ -383,6 +385,7 @@ class ToDeviceStream(Stream):
 class TagAccountDataStream(Stream):
     """Someone added/removed a tag for a room
     """
+
     NAME = "tag_account_data"
     ROW_TYPE = TagAccountDataStreamRow
 
@@ -398,6 +401,7 @@ class TagAccountDataStream(Stream):
 class AccountDataStream(Stream):
     """Global or per room account data was changed
     """
+
     NAME = "account_data"
     ROW_TYPE = AccountDataStreamRow
 
@@ -416,7 +420,7 @@ class AccountDataStream(Stream):
 
         results = list(room_results)
         results.extend(
-            (stream_id, user_id, None, account_data_type, content,)
+            (stream_id, user_id, None, account_data_type, content)
             for stream_id, user_id, account_data_type, content in global_results
         )
 
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f1290d022a..3d0694bb11 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -52,6 +52,7 @@ data part are:
 @attr.s(slots=True, frozen=True)
 class EventsStreamRow(object):
     """A parsed row from the events replication stream"""
+
     type = attr.ib()  # str: the TypeId of one of the *EventsStreamRows
     data = attr.ib()  # BaseEventsStreamRow
 
@@ -80,11 +81,11 @@ class BaseEventsStreamRow(object):
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()    # str
-    room_id = attr.ib()     # str
-    type = attr.ib()        # str
-    state_key = attr.ib()   # str, optional
-    redacts = attr.ib()     # str, optional
+    event_id = attr.ib()  # str
+    room_id = attr.ib()  # str
+    type = attr.ib()  # str
+    state_key = attr.ib()  # str, optional
+    redacts = attr.ib()  # str, optional
     relates_to = attr.ib()  # str, optional
 
 
@@ -92,24 +93,21 @@ class EventsStreamEventRow(BaseEventsStreamRow):
 class EventsStreamCurrentStateRow(BaseEventsStreamRow):
     TypeId = "state"
 
-    room_id = attr.ib()    # str
-    type = attr.ib()       # str
+    room_id = attr.ib()  # str
+    type = attr.ib()  # str
     state_key = attr.ib()  # str
-    event_id = attr.ib()   # str, optional
+    event_id = attr.ib()  # str, optional
 
 
 TypeToRow = {
-    Row.TypeId: Row
-    for Row in (
-        EventsStreamEventRow,
-        EventsStreamCurrentStateRow,
-    )
+    Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow)
 }
 
 
 class EventsStream(Stream):
     """We received a new event, or an event went from being an outlier to not
     """
+
     NAME = "events"
 
     def __init__(self, hs):
@@ -121,19 +119,17 @@ class EventsStream(Stream):
     @defer.inlineCallbacks
     def update_function(self, from_token, current_token, limit=None):
         event_rows = yield self._store.get_all_new_forward_event_rows(
-            from_token, current_token, limit,
+            from_token, current_token, limit
         )
         event_updates = (
-            (row[0], EventsStreamEventRow.TypeId, row[1:])
-            for row in event_rows
+            (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
         )
 
         state_rows = yield self._store.get_all_updated_current_state_deltas(
             from_token, current_token, limit
         )
         state_updates = (
-            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
-            for row in state_rows
+            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
         )
 
         all_updates = heapq.merge(event_updates, state_updates)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 9aa43aa8d2..dc2484109d 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -17,16 +17,20 @@ from collections import namedtuple
 
 from ._base import Stream
 
-FederationStreamRow = namedtuple("FederationStreamRow", (
-    "type",  # str, the type of data as defined in the BaseFederationRows
-    "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
-))
+FederationStreamRow = namedtuple(
+    "FederationStreamRow",
+    (
+        "type",  # str, the type of data as defined in the BaseFederationRows
+        "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
+    ),
+)
 
 
 class FederationStream(Stream):
     """Data to be sent over federation. Only available when master has federation
     sending disabled.
     """
+
     NAME = "federation"
     ROW_TYPE = FederationStreamRow