summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/stats.py11
-rw-r--r--synapse/handlers/user_directory.py18
2 files changed, 26 insertions, 3 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index bd3e6f2ec7..29e41a4c79 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -80,6 +80,17 @@ class StatsHandler:
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
             self.pos = await self.store.get_stats_positions()
+            room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+            if self.pos > room_max_stream_ordering:
+                # apparently, we've processed more events than exist in the database!
+                # this can happen if events are removed with history purge or similar.
+                logger.warning(
+                    "Event stream ordering appears to have gone backwards (%i -> %i): "
+                    "rewinding stats processor",
+                    self.pos,
+                    room_max_stream_ordering,
+                )
+                self.pos = room_max_stream_ordering
 
         # Loop round handling deltas until we're up to date
 
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a0eb45446f..1565e034cb 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -148,9 +148,21 @@ class UserDirectoryHandler(StateDeltasHandler):
         if self.pos is None:
             self.pos = await self.store.get_user_directory_stream_pos()
 
-        # If still None then the initial background update hasn't happened yet.
-        if self.pos is None:
-            return None
+            # If still None then the initial background update hasn't happened yet.
+            if self.pos is None:
+                return None
+
+            room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+            if self.pos > room_max_stream_ordering:
+                # apparently, we've processed more events than exist in the database!
+                # this can happen if events are removed with history purge or similar.
+                logger.warning(
+                    "Event stream ordering appears to have gone backwards (%i -> %i): "
+                    "rewinding user directory processor",
+                    self.pos,
+                    room_max_stream_ordering,
+                )
+                self.pos = room_max_stream_ordering
 
         # Loop round handling deltas until we're up to date
         while True: