diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/stats.py | 11 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 18 |
2 files changed, 26 insertions, 3 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index bd3e6f2ec7..29e41a4c79 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -80,6 +80,17 @@ class StatsHandler: # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = await self.store.get_stats_positions() + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding stats processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering # Loop round handling deltas until we're up to date diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a0eb45446f..1565e034cb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -148,9 +148,21 @@ class UserDirectoryHandler(StateDeltasHandler): if self.pos is None: self.pos = await self.store.get_user_directory_stream_pos() - # If still None then the initial background update hasn't happened yet. - if self.pos is None: - return None + # If still None then the initial background update hasn't happened yet. + if self.pos is None: + return None + + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding user directory processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering # Loop round handling deltas until we're up to date while True: |