summary refs log tree commit diff
path: root/synapse/replication/slave/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/slave/storage/events.py')
-rw-r--r--synapse/replication/slave/storage/events.py77
1 files changed, 44 insertions, 33 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a3952506c1..ab5937e638 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -45,21 +45,20 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(EventFederationWorkerStore,
-                       RoomMemberWorkerStore,
-                       EventPushActionsWorkerStore,
-                       StreamWorkerStore,
-                       StateGroupWorkerStore,
-                       EventsWorkerStore,
-                       SignatureWorkerStore,
-                       UserErasureWorkerStore,
-                       RelationsWorkerStore,
-                       BaseSlavedStore):
-
+class SlavedEventStore(
+    EventFederationWorkerStore,
+    RoomMemberWorkerStore,
+    EventPushActionsWorkerStore,
+    StreamWorkerStore,
+    StateGroupWorkerStore,
+    EventsWorkerStore,
+    SignatureWorkerStore,
+    UserErasureWorkerStore,
+    RelationsWorkerStore,
+    BaseSlavedStore,
+):
     def __init__(self, db_conn, hs):
-        self._stream_id_gen = SlavedIdTracker(
-            db_conn, "events", "stream_ordering",
-        )
+        self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
         self._backfill_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering", step=-1
         )
@@ -90,8 +89,13 @@ class SlavedEventStore(EventFederationWorkerStore,
             self._backfill_id_gen.advance(-token)
             for row in rows:
                 self.invalidate_caches_for_event(
-                    -token, row.event_id, row.room_id, row.type, row.state_key,
-                    row.redacts, row.relates_to,
+                    -token,
+                    row.event_id,
+                    row.room_id,
+                    row.type,
+                    row.state_key,
+                    row.redacts,
+                    row.relates_to,
                     backfilled=True,
                 )
         return super(SlavedEventStore, self).process_replication_rows(
@@ -103,41 +107,48 @@ class SlavedEventStore(EventFederationWorkerStore,
 
         if row.type == EventsStreamEventRow.TypeId:
             self.invalidate_caches_for_event(
-                token, data.event_id, data.room_id, data.type, data.state_key,
-                data.redacts, data.relates_to,
+                token,
+                data.event_id,
+                data.room_id,
+                data.type,
+                data.state_key,
+                data.redacts,
+                data.relates_to,
                 backfilled=False,
             )
         elif row.type == EventsStreamCurrentStateRow.TypeId:
             if data.type == EventTypes.Member:
                 self.get_rooms_for_user_with_stream_ordering.invalidate(
-                    (data.state_key, ),
+                    (data.state_key,)
                 )
         else:
-            raise Exception("Unknown events stream row type %s" % (row.type, ))
-
-    def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
-                                    etype, state_key, redacts, relates_to,
-                                    backfilled):
+            raise Exception("Unknown events stream row type %s" % (row.type,))
+
+    def invalidate_caches_for_event(
+        self,
+        stream_ordering,
+        event_id,
+        room_id,
+        etype,
+        state_key,
+        redacts,
+        relates_to,
+        backfilled,
+    ):
         self._invalidate_get_event_cache(event_id)
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
-        self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-            (room_id,)
-        )
+        self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
 
         if not backfilled:
-            self._events_stream_cache.entity_has_changed(
-                room_id, stream_ordering
-            )
+            self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
 
         if redacts:
             self._invalidate_get_event_cache(redacts)
 
         if etype == EventTypes.Member:
-            self._membership_stream_cache.entity_has_changed(
-                state_key, stream_ordering
-            )
+            self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
             self.get_invited_rooms_for_user.invalidate((state_key,))
 
         if relates_to: