summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py26
1 files changed, 4 insertions, 22 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 0ace7b787d..667ad20428 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -413,12 +413,6 @@ class GenericWorkerTyping(object):
         # map room IDs to sets of users currently typing
         self._room_typing = {}
 
-    def stream_positions(self):
-        # We must update this typing token from the response of the previous
-        # sync. In particular, the stream id may "reset" back to zero/a low
-        # value which we *must* use for the next replication request.
-        return {"typing": self._latest_room_serial}
-
     def process_replication_rows(self, token, rows):
         if self._latest_room_serial > token:
             # The master has gone backwards. To prevent inconsistent data, just
@@ -652,20 +646,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         else:
             self.send_handler = None
 
-    async def on_rdata(self, stream_name, token, rows):
-        await super(GenericWorkerReplicationHandler, self).on_rdata(
-            stream_name, token, rows
-        )
-        await self.process_and_notify(stream_name, token, rows)
+    async def on_rdata(self, stream_name, instance_name, token, rows):
+        await super().on_rdata(stream_name, instance_name, token, rows)
+        await self._process_and_notify(stream_name, instance_name, token, rows)
 
-    def get_streams_to_replicate(self):
-        args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
-        args.update(self.typing_handler.stream_positions())
-        if self.send_handler:
-            args.update(self.send_handler.stream_positions())
-        return args
-
-    async def process_and_notify(self, stream_name, token, rows):
+    async def _process_and_notify(self, stream_name, instance_name, token, rows):
         try:
             if self.send_handler:
                 await self.send_handler.process_replication_rows(
@@ -799,9 +784,6 @@ class FederationSenderHandler(object):
     def wake_destination(self, server: str):
         self.federation_sender.wake_destination(server)
 
-    def stream_positions(self):
-        return {"federation": self.federation_position}
-
     async def process_replication_rows(self, stream_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.