summary refs log tree commit diff
path: root/synapse/storage/util/id_generators.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/util/id_generators.py')
-rw-r--r--synapse/storage/util/id_generators.py26
1 files changed, 25 insertions, 1 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4fd7573e26..02fbb656e8 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -273,6 +273,19 @@ class MultiWriterIdGenerator:
 
         # Load the current positions of all writers for the stream.
         if self._writers:
+            # We delete any stale entries in the positions table. This is
+            # important if we add back a writer after a long time; we want to
+            # consider that a "new" writer, rather than using the old stale
+            # entry here.
+            sql = """
+                DELETE FROM stream_positions
+                WHERE
+                    stream_name = ?
+                    AND instance_name != ALL(?)
+            """
+            sql = self._db.engine.convert_param_style(sql)
+            cur.execute(sql, (self._stream_name, self._writers))
+
             sql = """
                 SELECT instance_name, stream_id FROM stream_positions
                 WHERE stream_name = ?
@@ -453,11 +466,22 @@ class MultiWriterIdGenerator:
         """Returns the position of the given writer.
         """
 
+        # If we don't have an entry for the given instance name, we assume it's a
+        # new writer.
+        #
+        # For new writers we assume their initial position to be the current
+        # persisted up to position. This stops Synapse from doing a full table
+        # scan when a new writer announces itself over replication.
         with self._lock:
-            return self._return_factor * self._current_positions.get(instance_name, 0)
+            return self._return_factor * self._current_positions.get(
+                instance_name, self._persisted_upto_position
+            )
 
     def get_positions(self) -> Dict[str, int]:
         """Get a copy of the current positon map.
+
+        Note that this won't necessarily include all configured writers if some
+        writers haven't written anything yet.
         """
 
         with self._lock: