summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-24 15:51:41 +0000
committerErik Johnston <erik@matrix.org>2020-03-24 15:54:38 +0000
commit7eec84bfbec26ffcd3835ecb2af0ac2d5b352fd6 (patch)
tree2b040ea3e63db328567babc939520e72d600aba3 /synapse/replication
parentMake ReplicationStreamer work on workers (diff)
downloadsynapse-7eec84bfbec26ffcd3835ecb2af0ac2d5b352fd6.tar.xz
Shuffle around code typing handlers
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/handler.py71
-rw-r--r--synapse/replication/tcp/resource.py8
2 files changed, 36 insertions, 43 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index b620b29dfb..c197f6c26d 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -329,10 +329,17 @@ class ReplicationClientHandler: self.send_command(RdataCommand(stream_name, token, data)) -class DummyReplicationDataHandler: +class ReplicationDataHandler: """A replication data handler that simply discards all data. """ + def __init__(self, hs): + self.store = hs.get_datastore() + self.typing_handler = hs.get_typing_handler() + + self.slaved_store = hs.config.worker_app is not None + self.slaved_typing = not hs.config.server.handle_typing + async def on_rdata(self, stream_name: str, token: int, rows: list): """Called to handle a batch of replication data with a given stream token. @@ -345,7 +352,11 @@ class DummyReplicationDataHandler: rows (list): a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ - pass + if self.slaved_store: + self.store.process_replication_rows(stream_name, token, rows) + + if self.slaved_typing: + self.typing_handler.process_replication_rows(stream_name, token, rows) def get_streams_to_replicate(self) -> Dict[str, int]: """Called when a new connection has been established and we need to @@ -355,49 +366,25 @@ class DummyReplicationDataHandler: map from stream name to the most recent update we have for that stream (ie, the point we want to start replicating from) """ - return {} - - async def on_position(self, stream_name: str, token: int): - pass - + args = {} # type: Dict[str, int] -class WorkerReplicationDataHandler: - """A replication data handler that calls slave data stores. - """ - - def __init__(self, store): - self.store = store - - async def on_rdata(self, stream_name: str, token: int, rows: list): - """Called to handle a batch of replication data with a given stream token. + if self.slaved_store: + args = self.store.stream_positions() + user_account_data = args.pop("user_account_data", None) + room_account_data = args.pop("room_account_data", None) + if user_account_data: + args["account_data"] = user_account_data + elif room_account_data: + args["account_data"] = room_account_data - By default this just pokes the slave store. Can be overridden in subclasses to - handle more. + if self.slaved_typing: + args.update(self.typing_handler.stream_positions()) - Args: - stream_name (str): name of the replication stream for this batch of rows - token (int): stream token for this batch of rows - rows (list): a list of Stream.ROW_TYPE objects as returned by - Stream.parse_row. - """ - self.store.process_replication_rows(stream_name, token, rows) - - def get_streams_to_replicate(self) -> Dict[str, int]: - """Called when a new connection has been established and we need to - subscribe to streams. - - Returns: - map from stream name to the most recent update we have for - that stream (ie, the point we want to start replicating from) - """ - args = self.store.stream_positions() - user_account_data = args.pop("user_account_data", None) - room_account_data = args.pop("room_account_data", None) - if user_account_data: - args["account_data"] = user_account_data - elif room_account_data: - args["account_data"] = room_account_data return args async def on_position(self, stream_name: str, token: int): - self.store.process_replication_rows(stream_name, token, []) + if self.slaved_store: + self.store.process_replication_rows(stream_name, token, []) + + if self.slaved_typing: + self.typing_handler.process_replication_rows(stream_name, token, []) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index b2a1785c08..c9d671210b 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -25,7 +25,7 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol -from synapse.replication.tcp.streams import STREAMS_MAP, Stream +from synapse.replication.tcp.streams import STREAMS_MAP, Stream, TypingStream from synapse.replication.tcp.streams.federation import FederationStream from synapse.util.metrics import Measure @@ -79,8 +79,14 @@ class ReplicationStreamer(object): # hase been disabled on the master. continue + if stream == TypingStream: + continue + self.streams.append(stream(hs)) + if hs.config.server.handle_typing: + self.streams.append(TypingStream(hs)) + self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.notifier.add_replication_callback(self.on_notifier_poke)