summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-05-05 18:14:00 +0100
committerRichard van der Hoff <richard@matrix.org>2020-05-05 18:14:00 +0100
commit13dd458b8ddb248a84f5cf0c063e6002c831d72a (patch)
treec29b804da054820a6c0ed1d59d0159e620862093 /synapse/replication/tcp/client.py
parentUpdate changelog.d/7423.misc (diff)
parentAdd backwards compatibility codepath to LoggingContext. (#7408) (diff)
downloadsynapse-13dd458b8ddb248a84f5cf0c063e6002c831d72a.tar.xz
Merge branch 'release-v1.13.0' into erikj/faster_device_lists_fetch
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py31
1 files changed, 8 insertions, 23 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 2d07b8b2d0..3bbf3c3569 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -16,7 +16,7 @@
 """
 
 import logging
-from typing import TYPE_CHECKING, Dict
+from typing import TYPE_CHECKING
 
 from twisted.internet.protocol import ReconnectingClientFactory
 
@@ -86,37 +86,22 @@ class ReplicationDataHandler:
     def __init__(self, store: BaseSlavedStore):
         self.store = store
 
-    async def on_rdata(self, stream_name: str, token: int, rows: list):
+    async def on_rdata(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
         """Called to handle a batch of replication data with a given stream token.
 
         By default this just pokes the slave store. Can be overridden in subclasses to
         handle more.
 
         Args:
-            stream_name (str): name of the replication stream for this batch of rows
-            token (int): stream token for this batch of rows
-            rows (list): a list of Stream.ROW_TYPE objects as returned by
-                Stream.parse_row.
+            stream_name: name of the replication stream for this batch of rows
+            instance_name: the instance that wrote the rows.
+            token: stream token for this batch of rows
+            rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
         """
         self.store.process_replication_rows(stream_name, token, rows)
 
-    def get_streams_to_replicate(self) -> Dict[str, int]:
-        """Called when a new connection has been established and we need to
-        subscribe to streams.
-
-        Returns:
-            map from stream name to the most recent update we have for
-            that stream (ie, the point we want to start replicating from)
-        """
-        args = self.store.stream_positions()
-        user_account_data = args.pop("user_account_data", None)
-        room_account_data = args.pop("room_account_data", None)
-        if user_account_data:
-            args["account_data"] = user_account_data
-        elif room_account_data:
-            args["account_data"] = room_account_data
-        return args
-
     async def on_position(self, stream_name: str, token: int):
         self.store.process_replication_rows(stream_name, token, [])