summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-10-10 11:29:01 +0100
committerGitHub <noreply@github.com>2019-10-10 11:29:01 +0100
commita139420a3cfda6a4a4ee4750611b31dd71fc33f3 (patch)
tree9129aff6a5e73673054a1c9125fa977099a2b7db /synapse/handlers/presence.py
parentRewrite the user_filter migration again (#6184) (diff)
downloadsynapse-a139420a3cfda6a4a4ee4750611b31dd71fc33f3.tar.xz
Fix races in room stats (and other) updates. (#6187)
Hopefully this will fix the occasional failures we were seeing in the room directory.

The problem was that events are not necessarily persisted (and `current_state_delta_stream` updated) in the same order as their stream_id. So for instance current_state_delta 9 might be persisted *before* current_state_delta 8. Then, when the room stats saw stream_id 9, it assumed it had done everything up to 9, and never came back to do stream_id 8.

We can solve this easily by only processing up to the stream_id where we know all events have been persisted.
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py16
1 files changed, 12 insertions, 4 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 053cf66b28..2a5f1a007d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -803,17 +803,25 @@ class PresenceHandler(object):
         # Loop round handling deltas until we're up to date
         while True:
             with Measure(self.clock, "presence_delta"):
-                deltas = yield self.store.get_current_state_deltas(self._event_pos)
-                if not deltas:
+                room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+                if self._event_pos == room_max_stream_ordering:
                     return
 
+                logger.debug(
+                    "Processing presence stats %s->%s",
+                    self._event_pos,
+                    room_max_stream_ordering,
+                )
+                max_pos, deltas = yield self.store.get_current_state_deltas(
+                    self._event_pos, room_max_stream_ordering
+                )
                 yield self._handle_state_delta(deltas)
 
-                self._event_pos = deltas[-1]["stream_id"]
+                self._event_pos = max_pos
 
                 # Expose current event processing position to prometheus
                 synapse.metrics.event_processing_positions.labels("presence").set(
-                    self._event_pos
+                    max_pos
                 )
 
     @defer.inlineCallbacks