summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/events.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-26 22:34:41 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-26 22:34:41 +0100
commita4daa899ec4cd195fc10936f68df5c78314b366c (patch)
tree35e88ff388b0f7652773a79930b732aa04f16bde /synapse/replication/tcp/streams/events.py
parentchangelog (diff)
parentImprove docs on choosing server_name (#5558) (diff)
downloadsynapse-a4daa899ec4cd195fc10936f68df5c78314b366c.tar.xz
Merge branch 'develop' into rav/saml2_client
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r--synapse/replication/tcp/streams/events.py32
1 files changed, 14 insertions, 18 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f1290d022a..3d0694bb11 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -52,6 +52,7 @@ data part are:
 @attr.s(slots=True, frozen=True)
 class EventsStreamRow(object):
     """A parsed row from the events replication stream"""
+
     type = attr.ib()  # str: the TypeId of one of the *EventsStreamRows
     data = attr.ib()  # BaseEventsStreamRow
 
@@ -80,11 +81,11 @@ class BaseEventsStreamRow(object):
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()    # str
-    room_id = attr.ib()     # str
-    type = attr.ib()        # str
-    state_key = attr.ib()   # str, optional
-    redacts = attr.ib()     # str, optional
+    event_id = attr.ib()  # str
+    room_id = attr.ib()  # str
+    type = attr.ib()  # str
+    state_key = attr.ib()  # str, optional
+    redacts = attr.ib()  # str, optional
     relates_to = attr.ib()  # str, optional
 
 
@@ -92,24 +93,21 @@ class EventsStreamEventRow(BaseEventsStreamRow):
 class EventsStreamCurrentStateRow(BaseEventsStreamRow):
     TypeId = "state"
 
-    room_id = attr.ib()    # str
-    type = attr.ib()       # str
+    room_id = attr.ib()  # str
+    type = attr.ib()  # str
     state_key = attr.ib()  # str
-    event_id = attr.ib()   # str, optional
+    event_id = attr.ib()  # str, optional
 
 
 TypeToRow = {
-    Row.TypeId: Row
-    for Row in (
-        EventsStreamEventRow,
-        EventsStreamCurrentStateRow,
-    )
+    Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow)
 }
 
 
 class EventsStream(Stream):
     """We received a new event, or an event went from being an outlier to not
     """
+
     NAME = "events"
 
     def __init__(self, hs):
@@ -121,19 +119,17 @@ class EventsStream(Stream):
     @defer.inlineCallbacks
     def update_function(self, from_token, current_token, limit=None):
         event_rows = yield self._store.get_all_new_forward_event_rows(
-            from_token, current_token, limit,
+            from_token, current_token, limit
         )
         event_updates = (
-            (row[0], EventsStreamEventRow.TypeId, row[1:])
-            for row in event_rows
+            (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
         )
 
         state_rows = yield self._store.get_all_updated_current_state_deltas(
             from_token, current_token, limit
         )
         state_updates = (
-            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
-            for row in state_rows
+            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
         )
 
         all_updates = heapq.merge(event_updates, state_updates)