summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/events.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
commit5097aee740b542407e5bb13d19a3e3e6c2227316 (patch)
tree09a03650256e09cd0b5df59dbf2d7bb2ba14df6c /synapse/replication/tcp/streams/events.py
parentchangelog (diff)
parentImprove help and cmdline option names for --generate-config options (#5512) (diff)
downloadsynapse-5097aee740b542407e5bb13d19a3e3e6c2227316.tar.xz
Merge branch 'develop' into rav/cleanup_metrics
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r--synapse/replication/tcp/streams/events.py32
1 files changed, 14 insertions, 18 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f1290d022a..3d0694bb11 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -52,6 +52,7 @@ data part are:
 @attr.s(slots=True, frozen=True)
 class EventsStreamRow(object):
     """A parsed row from the events replication stream"""
+
     type = attr.ib()  # str: the TypeId of one of the *EventsStreamRows
     data = attr.ib()  # BaseEventsStreamRow
 
@@ -80,11 +81,11 @@ class BaseEventsStreamRow(object):
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()    # str
-    room_id = attr.ib()     # str
-    type = attr.ib()        # str
-    state_key = attr.ib()   # str, optional
-    redacts = attr.ib()     # str, optional
+    event_id = attr.ib()  # str
+    room_id = attr.ib()  # str
+    type = attr.ib()  # str
+    state_key = attr.ib()  # str, optional
+    redacts = attr.ib()  # str, optional
     relates_to = attr.ib()  # str, optional
 
 
@@ -92,24 +93,21 @@ class EventsStreamEventRow(BaseEventsStreamRow):
 class EventsStreamCurrentStateRow(BaseEventsStreamRow):
     TypeId = "state"
 
-    room_id = attr.ib()    # str
-    type = attr.ib()       # str
+    room_id = attr.ib()  # str
+    type = attr.ib()  # str
     state_key = attr.ib()  # str
-    event_id = attr.ib()   # str, optional
+    event_id = attr.ib()  # str, optional
 
 
 TypeToRow = {
-    Row.TypeId: Row
-    for Row in (
-        EventsStreamEventRow,
-        EventsStreamCurrentStateRow,
-    )
+    Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow)
 }
 
 
 class EventsStream(Stream):
     """We received a new event, or an event went from being an outlier to not
     """
+
     NAME = "events"
 
     def __init__(self, hs):
@@ -121,19 +119,17 @@ class EventsStream(Stream):
     @defer.inlineCallbacks
     def update_function(self, from_token, current_token, limit=None):
         event_rows = yield self._store.get_all_new_forward_event_rows(
-            from_token, current_token, limit,
+            from_token, current_token, limit
         )
         event_updates = (
-            (row[0], EventsStreamEventRow.TypeId, row[1:])
-            for row in event_rows
+            (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
         )
 
         state_rows = yield self._store.get_all_updated_current_state_deltas(
             from_token, current_token, limit
         )
         state_updates = (
-            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
-            for row in state_rows
+            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
         )
 
         all_updates = heapq.merge(event_updates, state_updates)