summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-18 19:35:29 +0000
committerGitHub <noreply@github.com>2023-01-18 19:35:29 +0000
commit9187fd940e2b2bbfd4df7204053cc26b2707aad4 (patch)
treef01de70e9daf00857dff25751e8dbc7c162e5fe5 /synapse/storage
parentChange default room version to 10. Implements MSC3904 (#14111) (diff)
downloadsynapse-9187fd940e2b2bbfd4df7204053cc26b2707aad4.tar.xz
Wait for streams to catch up when processing HTTP replication. (#14820)
This should hopefully mitigate a class of races where data gets out of
sync due a HTTP replication request racing with the replication streams.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/util/id_generators.py34
1 files changed, 19 insertions, 15 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 0d7108f01b..8670ffbfa3 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -378,6 +378,12 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             self._current_positions.values(), default=1
         )
 
+        if not writers:
+            # If there have been no explicit writers given then any instance can
+            # write to the stream. In which case, let's pre-seed our own
+            # position with the current minimum.
+            self._current_positions[self._instance_name] = self._persisted_upto_position
+
     def _load_current_ids(
         self,
         db_conn: LoggingDatabaseConnection,
@@ -695,24 +701,22 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
 
         heapq.heappush(self._known_persisted_positions, new_id)
 
-        # If we're a writer and we don't have any active writes we update our
-        # current position to the latest position seen. This allows the instance
-        # to report a recent position when asked, rather than a potentially old
-        # one (if this instance hasn't written anything for a while).
-        our_current_position = self._current_positions.get(self._instance_name)
-        if (
-            our_current_position
-            and not self._unfinished_ids
-            and not self._in_flight_fetches
-        ):
-            self._current_positions[self._instance_name] = max(
-                our_current_position, new_id
-            )
-
         # We move the current min position up if the minimum current positions
         # of all instances is higher (since by definition all positions less
         # that that have been persisted).
-        min_curr = min(self._current_positions.values(), default=0)
+        our_current_position = self._current_positions.get(self._instance_name, 0)
+        min_curr = min(
+            (
+                token
+                for name, token in self._current_positions.items()
+                if name != self._instance_name
+            ),
+            default=our_current_position,
+        )
+
+        if our_current_position and (self._unfinished_ids or self._in_flight_fetches):
+            min_curr = min(min_curr, our_current_position)
+
         self._persisted_upto_position = max(min_curr, self._persisted_upto_position)
 
         # We now iterate through the seen positions, discarding those that are