diff options
author | Erik Johnston <erik@matrix.org> | 2020-08-19 10:39:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-19 10:39:31 +0100 |
commit | 76d21d14a042756b0c8a8f520dfd9ea09cf092c7 (patch) | |
tree | 8d24252c5e3bdcff215a8536da4bec429fd6fbbd /synapse/replication | |
parent | Convert events worker database to async/await. (#8071) (diff) | |
download | synapse-76d21d14a042756b0c8a8f520dfd9ea09cf092c7.tar.xz |
Separate `get_current_token` into two. (#8113)
The function is used for two purposes: 1) for subscribers of streams to get a token they can use to get further updates with, and 2) for replication to track position of the writers of the stream. For streams with a single writer the two scenarios produce the same result, however the situation becomes complicated for streams with multiple writers. The current `MultiWriterIdGenerator` does not correctly handle the first case (which is not an issue as its only used for the `caches` stream which nothing subscribes to outside of replication).
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/slave/storage/_slaved_id_tracker.py | 8 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 2 |
2 files changed, 9 insertions, 1 deletions
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 9d1d173b2f..d43eaf3a29 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -33,3 +33,11 @@ class SlavedIdTracker(object): int """ return self._current + + def get_current_token_for_writer(self, instance_name: str) -> int: + """Returns the position of the given writer. + + For streams with single writers this is equivalent to + `get_current_token`. + """ + return self.get_current_token() diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 7a42de3f7d..1e92d52165 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -405,7 +405,7 @@ class CachesStream(Stream): store = hs.get_datastore() super().__init__( hs.get_instance_name(), - store.get_cache_stream_token, + store.get_cache_stream_token_for_writer, store.get_all_updated_caches, ) |