Fix AssertionErrors after purging events (#11642)
* Fix AssertionErrors after purging events
If you purged a bunch of events from your database, and then restarted synapse
without receiving more events, then you would get a bunch of AssertionErrors on
restart.
This fixes the situation by rewinding the stream processors.
* `check-newsfragment`: ignore deleted newsfiles
2 files changed, 26 insertions, 3 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index bd3e6f2ec7..29e41a4c79 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -80,6 +80,17 @@ class StatsHandler:
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = await self.store.get_stats_positions()
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos > room_max_stream_ordering:
+ # apparently, we've processed more events than exist in the database!
+ # this can happen if events are removed with history purge or similar.
+ logger.warning(
+ "Event stream ordering appears to have gone backwards (%i -> %i): "
+ "rewinding stats processor",
+ self.pos,
+ room_max_stream_ordering,
+ )
+ self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a0eb45446f..1565e034cb 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -148,9 +148,21 @@ class UserDirectoryHandler(StateDeltasHandler):
if self.pos is None:
self.pos = await self.store.get_user_directory_stream_pos()
- # If still None then the initial background update hasn't happened yet.
- if self.pos is None:
- return None
+ # If still None then the initial background update hasn't happened yet.
+ if self.pos is None:
+ return None
+
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos > room_max_stream_ordering:
+ # apparently, we've processed more events than exist in the database!
+ # this can happen if events are removed with history purge or similar.
+ logger.warning(
+ "Event stream ordering appears to have gone backwards (%i -> %i): "
+ "rewinding user directory processor",
+ self.pos,
+ room_max_stream_ordering,
+ )
+ self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date
while True:
|