summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-01-18 15:47:59 +0000
committerGitHub <noreply@github.com>2021-01-18 15:47:59 +0000
commit6633a4015a7b4ba60f87c5e6f979a9c9d8f9d8fe (patch)
tree6e111e6f153f80e324f69f418e33fff396c19773 /synapse/replication/tcp
parentEnforce all replication HTTP clients calls use kwargs (#9144) (diff)
downloadsynapse-6633a4015a7b4ba60f87c5e6f979a9c9d8f9d8fe.tar.xz
Allow moving account data and receipts streams off master (#9104)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/handler.py19
1 files changed, 19 insertions, 0 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1f89249475..317796d5e0 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -51,11 +51,14 @@ from synapse.replication.tcp.commands import (
 from synapse.replication.tcp.protocol import AbstractConnection
 from synapse.replication.tcp.streams import (
     STREAMS_MAP,
+    AccountDataStream,
     BackfillStream,
     CachesStream,
     EventsStream,
     FederationStream,
+    ReceiptsStream,
     Stream,
+    TagAccountDataStream,
     ToDeviceStream,
     TypingStream,
 )
@@ -132,6 +135,22 @@ class ReplicationCommandHandler:
 
                 continue
 
+            if isinstance(stream, (AccountDataStream, TagAccountDataStream)):
+                # Only add AccountDataStream and TagAccountDataStream as a source on the
+                # instance in charge of account_data persistence.
+                if hs.get_instance_name() in hs.config.worker.writers.account_data:
+                    self._streams_to_replicate.append(stream)
+
+                continue
+
+            if isinstance(stream, ReceiptsStream):
+                # Only add ReceiptsStream as a source on the instance in charge of
+                # receipts.
+                if hs.get_instance_name() in hs.config.worker.writers.receipts:
+                    self._streams_to_replicate.append(stream)
+
+                continue
+
             # Only add any other streams if we're on master.
             if hs.config.worker_app is not None:
                 continue