diff options
author | Erik Johnston <erik@matrix.org> | 2020-10-12 15:51:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-12 15:51:41 +0100 |
commit | 8de3703d214c814ad637793a0cc2220e20579ffa (patch) | |
tree | 3daec0f9a8d7baf92f8746b12c8b54d92b8d7d6d /synapse/storage/util | |
parent | Docker: support passing additional commandline args to synapse (#8390) (diff) | |
download | synapse-8de3703d214c814ad637793a0cc2220e20579ffa.tar.xz |
Make event persisters periodically announce position over replication. (#8499)
Currently background proccesses stream the events stream use the "minimum persisted position" (i.e. `get_current_token()`) rather than the vector clock style tokens. This is broadly fine as it doesn't matter if the background processes lag a small amount. However, in extreme cases (i.e. SyTests) where we only write to one event persister the background processes will never make progress. This PR changes it so that the `MultiWriterIDGenerator` keeps the current position of a given instance as up to date as possible (i.e using the latest token it sees if its not in the process of persisting anything), and then periodically announces that over replication. This then allows the "minimum persisted position" to advance, albeit with a small lag.
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/id_generators.py | 10 | ||||
-rw-r--r-- | synapse/storage/util/sequence.py | 2 |
2 files changed, 12 insertions, 0 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index d7e40aaa8b..3d8da48f2d 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -524,6 +524,16 @@ class MultiWriterIdGenerator: heapq.heappush(self._known_persisted_positions, new_id) + # If we're a writer and we don't have any active writes we update our + # current position to the latest position seen. This allows the instance + # to report a recent position when asked, rather than a potentially old + # one (if this instance hasn't written anything for a while). + our_current_position = self._current_positions.get(self._instance_name) + if our_current_position and not self._unfinished_ids: + self._current_positions[self._instance_name] = max( + our_current_position, new_id + ) + # We move the current min position up if the minimum current positions # of all instances is higher (since by definition all positions less # that that have been persisted). diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index ff2d038ad2..4386b6101e 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -126,6 +126,8 @@ class PostgresSequenceGenerator(SequenceGenerator): if max_stream_id > last_value: logger.warning( "Postgres sequence %s is behind table %s: %d < %d", + self._sequence_name, + table, last_value, max_stream_id, ) |