diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-01 15:21:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-01 15:21:35 +0100 |
commit | 3085cde577216519d789c8160262831cb2029972 (patch) | |
tree | bbd1fc5b416eb091dd7b936247d87079505239d1 /synapse/replication/tcp/client.py | |
parent | async/await is_server_admin (#7363) (diff) | |
download | synapse-3085cde577216519d789c8160262831cb2029972.tar.xz |
Use `stream.current_token()` and remove `stream_positions()` (#7172)
We move the processing of typing and federation replication traffic into their handlers so that `Stream.current_token()` points to a valid token. This allows us to remove `get_streams_to_replicate()` and `stream_positions()`.
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r-- | synapse/replication/tcp/client.py | 19 |
1 files changed, 1 insertions, 18 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2d07b8b2d0..5c28fd4ac3 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -16,7 +16,7 @@ """ import logging -from typing import TYPE_CHECKING, Dict +from typing import TYPE_CHECKING from twisted.internet.protocol import ReconnectingClientFactory @@ -100,23 +100,6 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, token, rows) - def get_streams_to_replicate(self) -> Dict[str, int]: - """Called when a new connection has been established and we need to - subscribe to streams. - - Returns: - map from stream name to the most recent update we have for - that stream (ie, the point we want to start replicating from) - """ - args = self.store.stream_positions() - user_account_data = args.pop("user_account_data", None) - room_account_data = args.pop("room_account_data", None) - if user_account_data: - args["account_data"] = user_account_data - elif room_account_data: - args["account_data"] = room_account_data - return args - async def on_position(self, stream_name: str, token: int): self.store.process_replication_rows(stream_name, token, []) |