diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-01 15:21:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-01 15:21:35 +0100 |
commit | 3085cde577216519d789c8160262831cb2029972 (patch) | |
tree | bbd1fc5b416eb091dd7b936247d87079505239d1 /synapse/app | |
parent | async/await is_server_admin (#7363) (diff) | |
download | synapse-3085cde577216519d789c8160262831cb2029972.tar.xz |
Use `stream.current_token()` and remove `stream_positions()` (#7172)
We move the processing of typing and federation replication traffic into their handlers so that `Stream.current_token()` points to a valid token. This allows us to remove `get_streams_to_replicate()` and `stream_positions()`.
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/generic_worker.py | 16 |
1 files changed, 0 insertions, 16 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 0ace7b787d..97b9b81237 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -413,12 +413,6 @@ class GenericWorkerTyping(object): # map room IDs to sets of users currently typing self._room_typing = {} - def stream_positions(self): - # We must update this typing token from the response of the previous - # sync. In particular, the stream id may "reset" back to zero/a low - # value which we *must* use for the next replication request. - return {"typing": self._latest_room_serial} - def process_replication_rows(self, token, rows): if self._latest_room_serial > token: # The master has gone backwards. To prevent inconsistent data, just @@ -658,13 +652,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): ) await self.process_and_notify(stream_name, token, rows) - def get_streams_to_replicate(self): - args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate() - args.update(self.typing_handler.stream_positions()) - if self.send_handler: - args.update(self.send_handler.stream_positions()) - return args - async def process_and_notify(self, stream_name, token, rows): try: if self.send_handler: @@ -799,9 +786,6 @@ class FederationSenderHandler(object): def wake_destination(self, server: str): self.federation_sender.wake_destination(server) - def stream_positions(self): - return {"federation": self.federation_position} - async def process_replication_rows(self, stream_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. |