summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/resource.py3
-rw-r--r--synapse/replication/slave/storage/events.py24
2 files changed, 8 insertions, 19 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index a30e647474..d8eb14592b 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -299,9 +299,6 @@ class ReplicationResource(Resource):
                 "backward_ex_outliers", res.backward_ex_outliers,
                 ("position", "event_id", "state_group"),
             )
-            writer.write_header_and_rows(
-                "state_resets", res.state_resets, ("position",),
-            )
 
     @defer.inlineCallbacks
     def presence(self, writer, current_token, request_streams):
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b3f3bf7488..d72ff6055c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore):
     # to reach inside the __dict__ to extract them.
     get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
     get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
+    get_users_who_share_room_with_user = (
+        RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
+    )
     get_latest_event_ids_in_room = EventFederationStore.__dict__[
         "get_latest_event_ids_in_room"
     ]
@@ -192,10 +195,6 @@ class SlavedEventStore(BaseSlavedStore):
         return result
 
     def process_replication(self, result):
-        state_resets = set(
-            r[0] for r in result.get("state_resets", {"rows": []})["rows"]
-        )
-
         stream = result.get("events")
         if stream:
             self._stream_id_gen.advance(int(stream["position"]))
@@ -205,7 +204,7 @@ class SlavedEventStore(BaseSlavedStore):
 
             for row in stream["rows"]:
                 self._process_replication_row(
-                    row, backfilled=False, state_resets=state_resets
+                    row, backfilled=False,
                 )
 
         stream = result.get("backfill")
@@ -213,7 +212,7 @@ class SlavedEventStore(BaseSlavedStore):
             self._backfill_id_gen.advance(-int(stream["position"]))
             for row in stream["rows"]:
                 self._process_replication_row(
-                    row, backfilled=True, state_resets=state_resets
+                    row, backfilled=True,
                 )
 
         stream = result.get("forward_ex_outliers")
@@ -232,20 +231,15 @@ class SlavedEventStore(BaseSlavedStore):
 
         return super(SlavedEventStore, self).process_replication(result)
 
-    def _process_replication_row(self, row, backfilled, state_resets):
-        position = row[0]
+    def _process_replication_row(self, row, backfilled):
         internal = json.loads(row[1])
         event_json = json.loads(row[2])
         event = FrozenEvent(event_json, internal_metadata_dict=internal)
         self.invalidate_caches_for_event(
-            event, backfilled, reset_state=position in state_resets
+            event, backfilled,
         )
 
-    def invalidate_caches_for_event(self, event, backfilled, reset_state):
-        if reset_state:
-            self.get_rooms_for_user.invalidate_all()
-            self.get_users_in_room.invalidate((event.room_id,))
-
+    def invalidate_caches_for_event(self, event, backfilled):
         self._invalidate_get_event_cache(event.event_id)
 
         self.get_latest_event_ids_in_room.invalidate((event.room_id,))
@@ -267,8 +261,6 @@ class SlavedEventStore(BaseSlavedStore):
             self._invalidate_get_event_cache(event.redacts)
 
         if event.type == EventTypes.Member:
-            self.get_rooms_for_user.invalidate((event.state_key,))
-            self.get_users_in_room.invalidate((event.room_id,))
             self._membership_stream_cache.entity_has_changed(
                 event.state_key, event.internal_metadata.stream_ordering
             )