diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index a30e647474..d8eb14592b 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -299,9 +299,6 @@ class ReplicationResource(Resource):
"backward_ex_outliers", res.backward_ex_outliers,
("position", "event_id", "state_group"),
)
- writer.write_header_and_rows(
- "state_resets", res.state_resets, ("position",),
- )
@defer.inlineCallbacks
def presence(self, writer, current_token, request_streams):
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b3f3bf7488..15a025a019 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore):
return result
def process_replication(self, result):
- state_resets = set(
- r[0] for r in result.get("state_resets", {"rows": []})["rows"]
- )
-
stream = result.get("events")
if stream:
self._stream_id_gen.advance(int(stream["position"]))
@@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore):
for row in stream["rows"]:
self._process_replication_row(
- row, backfilled=False, state_resets=state_resets
+ row, backfilled=False,
)
stream = result.get("backfill")
@@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore):
self._backfill_id_gen.advance(-int(stream["position"]))
for row in stream["rows"]:
self._process_replication_row(
- row, backfilled=True, state_resets=state_resets
+ row, backfilled=True,
)
stream = result.get("forward_ex_outliers")
@@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore):
return super(SlavedEventStore, self).process_replication(result)
- def _process_replication_row(self, row, backfilled, state_resets):
- position = row[0]
+ def _process_replication_row(self, row, backfilled):
internal = json.loads(row[1])
event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal)
self.invalidate_caches_for_event(
- event, backfilled, reset_state=position in state_resets
+ event, backfilled,
)
- def invalidate_caches_for_event(self, event, backfilled, reset_state):
- if reset_state:
- self.get_rooms_for_user.invalidate_all()
- self.get_users_in_room.invalidate((event.room_id,))
-
+ def invalidate_caches_for_event(self, event, backfilled):
self._invalidate_get_event_cache(event.event_id)
self.get_latest_event_ids_in_room.invalidate((event.room_id,))
@@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore):
self._invalidate_get_event_cache(event.redacts)
if event.type == EventTypes.Member:
- self.get_rooms_for_user.invalidate((event.state_key,))
- self.get_users_in_room.invalidate((event.room_id,))
self._membership_stream_cache.entity_has_changed(
event.state_key, event.internal_metadata.stream_ordering
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6685b9da1c..f4352b326b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -572,14 +572,6 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,)
)
- # Add an entry to the current_state_resets table to record the point
- # where we clobbered the current state
- self._simple_insert_txn(
- txn,
- table="current_state_resets",
- values={"event_stream_ordering": max_stream_order}
- )
-
for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn(
txn,
@@ -1611,15 +1603,6 @@ class EventsStore(SQLBaseStore):
upper_bound = current_forward_id
sql = (
- "SELECT event_stream_ordering FROM current_state_resets"
- " WHERE ? < event_stream_ordering"
- " AND event_stream_ordering <= ?"
- " ORDER BY event_stream_ordering ASC"
- )
- txn.execute(sql, (last_forward_id, upper_bound))
- state_resets = txn.fetchall()
-
- sql = (
"SELECT event_stream_ordering, event_id, state_group"
" FROM ex_outlier_stream"
" WHERE ? > event_stream_ordering"
@@ -1630,7 +1613,6 @@ class EventsStore(SQLBaseStore):
forward_ex_outliers = txn.fetchall()
else:
new_forward_events = []
- state_resets = []
forward_ex_outliers = []
sql = (
@@ -1670,7 +1652,6 @@ class EventsStore(SQLBaseStore):
return AllNewEventsResult(
new_forward_events, new_backfill_events,
forward_ex_outliers, backward_ex_outliers,
- state_resets,
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
@@ -1896,5 +1877,4 @@ class EventsStore(SQLBaseStore):
AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
"forward_ex_outliers", "backward_ex_outliers",
- "state_resets"
])
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0fdcf29085..845def8467 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore):
)
for event in events:
- txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
- txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
|