summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-10-21 14:53:16 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-10-21 14:53:16 +0100
commite8ed9a6016d15907b2b7eb30fbf7e7831cbb1b94 (patch)
tree632f49eada8b4e8f15603fbd052ae3b1c53a2886 /synapse/storage/util
parentMerge commit 'f43c66d23' into anoa/dinsic_release_1_21_x (diff)
parentUpdate description of server_name config option (#8415) (diff)
downloadsynapse-e8ed9a6016d15907b2b7eb30fbf7e7831cbb1b94.tar.xz
Merge commit '8238b55e0' into anoa/dinsic_release_1_21_x
* commit '8238b55e0':
  Update description of server_name config option (#8415)
  Discard an empty upload_name before persisting an uploaded file (#7905)
  Don't table scan events on worker startup (#8419)
  Mypy fixes for `synapse.handlers.federation` (#8422)
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py26
1 files changed, 25 insertions, 1 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py

index 4fd7573e26..02fbb656e8 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -273,6 +273,19 @@ class MultiWriterIdGenerator: # Load the current positions of all writers for the stream. if self._writers: + # We delete any stale entries in the positions table. This is + # important if we add back a writer after a long time; we want to + # consider that a "new" writer, rather than using the old stale + # entry here. + sql = """ + DELETE FROM stream_positions + WHERE + stream_name = ? + AND instance_name != ALL(?) + """ + sql = self._db.engine.convert_param_style(sql) + cur.execute(sql, (self._stream_name, self._writers)) + sql = """ SELECT instance_name, stream_id FROM stream_positions WHERE stream_name = ? @@ -453,11 +466,22 @@ class MultiWriterIdGenerator: """Returns the position of the given writer. """ + # If we don't have an entry for the given instance name, we assume it's a + # new writer. + # + # For new writers we assume their initial position to be the current + # persisted up to position. This stops Synapse from doing a full table + # scan when a new writer announces itself over replication. with self._lock: - return self._return_factor * self._current_positions.get(instance_name, 0) + return self._return_factor * self._current_positions.get( + instance_name, self._persisted_upto_position + ) def get_positions(self) -> Dict[str, int]: """Get a copy of the current positon map. + + Note that this won't necessarily include all configured writers if some + writers haven't written anything yet. """ with self._lock: