1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 38d11fdd0f..63a91f1177 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,11 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams._base import ReceiptsStream
+from synapse.replication.tcp.streams._base import (
+ DeviceListsStream,
+ ReceiptsStream,
+ ToDeviceStream,
+)
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.types import ReadReceipt
@@ -256,6 +260,20 @@ class FederationSenderHandler(object):
"process_receipts_for_federation", self._on_new_receipts, rows
)
+ # ... as well as device updates and messages
+ elif stream_name == DeviceListsStream.NAME:
+ hosts = set(row.destination for row in rows)
+ for host in hosts:
+ self.federation_sender.send_device_messages(host)
+
+ elif stream_name == ToDeviceStream.NAME:
+ # The to_device stream includes stuff to be pushed to both local
+ # clients and remote servers, so we ignore entities that start with
+ # '@' (since they'll be local users rather than destinations).
+ hosts = set(row.entity for row in rows if not row.entity.startswith("@"))
+ for host in hosts:
+ self.federation_sender.send_device_messages(host)
+
@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
|