summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-29 16:42:19 +0100
committerGitHub <noreply@github.com>2020-09-29 16:42:19 +0100
commitb1433bf231370636b817ffa01e6cda5a567cfafe (patch)
tree7357924dea4233ae76dba772c9a6ec1fb55ba2dd /synapse/storage/util
parentMypy fixes for `synapse.handlers.federation` (#8422) (diff)
downloadsynapse-b1433bf231370636b817ffa01e6cda5a567cfafe.tar.xz
Don't table scan events on worker startup (#8419)
* Fix table scan of events on worker startup.

This happened because we assumed "new" writers had an initial stream
position of 0, so the replication code tried to fetch all events written
by the instance between 0 and the current position.

Instead, set the initial position of new writers to the current
persisted up to position, on the assumption that new writers won't have
written anything before that point.

* Consider old writers coming back as "new".

Otherwise we'd try and fetch entries between the old stale token and the
current position, even though it won't have written any rows.

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py26
1 files changed, 25 insertions, 1 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4fd7573e26..02fbb656e8 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -273,6 +273,19 @@ class MultiWriterIdGenerator:
 
         # Load the current positions of all writers for the stream.
         if self._writers:
+            # We delete any stale entries in the positions table. This is
+            # important if we add back a writer after a long time; we want to
+            # consider that a "new" writer, rather than using the old stale
+            # entry here.
+            sql = """
+                DELETE FROM stream_positions
+                WHERE
+                    stream_name = ?
+                    AND instance_name != ALL(?)
+            """
+            sql = self._db.engine.convert_param_style(sql)
+            cur.execute(sql, (self._stream_name, self._writers))
+
             sql = """
                 SELECT instance_name, stream_id FROM stream_positions
                 WHERE stream_name = ?
@@ -453,11 +466,22 @@ class MultiWriterIdGenerator:
         """Returns the position of the given writer.
         """
 
+        # If we don't have an entry for the given instance name, we assume it's a
+        # new writer.
+        #
+        # For new writers we assume their initial position to be the current
+        # persisted up to position. This stops Synapse from doing a full table
+        # scan when a new writer announces itself over replication.
         with self._lock:
-            return self._return_factor * self._current_positions.get(instance_name, 0)
+            return self._return_factor * self._current_positions.get(
+                instance_name, self._persisted_upto_position
+            )
 
     def get_positions(self) -> Dict[str, int]:
         """Get a copy of the current positon map.
+
+        Note that this won't necessarily include all configured writers if some
+        writers haven't written anything yet.
         """
 
         with self._lock: