diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a3952506c1..ab5937e638 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -45,21 +45,20 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(EventFederationWorkerStore,
- RoomMemberWorkerStore,
- EventPushActionsWorkerStore,
- StreamWorkerStore,
- StateGroupWorkerStore,
- EventsWorkerStore,
- SignatureWorkerStore,
- UserErasureWorkerStore,
- RelationsWorkerStore,
- BaseSlavedStore):
-
+class SlavedEventStore(
+ EventFederationWorkerStore,
+ RoomMemberWorkerStore,
+ EventPushActionsWorkerStore,
+ StreamWorkerStore,
+ StateGroupWorkerStore,
+ EventsWorkerStore,
+ SignatureWorkerStore,
+ UserErasureWorkerStore,
+ RelationsWorkerStore,
+ BaseSlavedStore,
+):
def __init__(self, db_conn, hs):
- self._stream_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering",
- )
+ self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
@@ -90,8 +89,13 @@ class SlavedEventStore(EventFederationWorkerStore,
self._backfill_id_gen.advance(-token)
for row in rows:
self.invalidate_caches_for_event(
- -token, row.event_id, row.room_id, row.type, row.state_key,
- row.redacts, row.relates_to,
+ -token,
+ row.event_id,
+ row.room_id,
+ row.type,
+ row.state_key,
+ row.redacts,
+ row.relates_to,
backfilled=True,
)
return super(SlavedEventStore, self).process_replication_rows(
@@ -103,41 +107,48 @@ class SlavedEventStore(EventFederationWorkerStore,
if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event(
- token, data.event_id, data.room_id, data.type, data.state_key,
- data.redacts, data.relates_to,
+ token,
+ data.event_id,
+ data.room_id,
+ data.type,
+ data.state_key,
+ data.redacts,
+ data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
- (data.state_key, ),
+ (data.state_key,)
)
else:
- raise Exception("Unknown events stream row type %s" % (row.type, ))
-
- def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
- etype, state_key, redacts, relates_to,
- backfilled):
+ raise Exception("Unknown events stream row type %s" % (row.type,))
+
+ def invalidate_caches_for_event(
+ self,
+ stream_ordering,
+ event_id,
+ room_id,
+ etype,
+ state_key,
+ redacts,
+ relates_to,
+ backfilled,
+ ):
self._invalidate_get_event_cache(event_id)
self.get_latest_event_ids_in_room.invalidate((room_id,))
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
- (room_id,)
- )
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
if not backfilled:
- self._events_stream_cache.entity_has_changed(
- room_id, stream_ordering
- )
+ self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
if redacts:
self._invalidate_get_event_cache(redacts)
if etype == EventTypes.Member:
- self._membership_stream_cache.entity_has_changed(
- state_key, stream_ordering
- )
+ self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_user.invalidate((state_key,))
if relates_to:
|