summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-29 11:23:01 +0000
committerGitHub <noreply@github.com>2020-01-29 11:23:01 +0000
commit6b9e1014cf9c107f3198999159fbc935376fdcc9 (patch)
treed2807e44b6797d6cc44f8a1c8992d060b8a4b104 /synapse/app/federation_sender.py
parentDelete current state when server leaves a room (#6792) (diff)
downloadsynapse-6b9e1014cf9c107f3198999159fbc935376fdcc9.tar.xz
Fix race in federation sender that delayed device updates. (#6799)
We were sending device updates down both the federation stream and
device streams. This mean there was a race if the federation sender
worker processed the federation stream first, as when the sender checked
if there were new device updates the slaved ID generator hadn't been
updated with the new stream IDs and so returned nothing.

This situation is correctly handled by events/receipts/etc by not
sending updates down the federation stream and instead having the
federation sender worker listen on the other streams and poke the
transaction queues as appropriate.
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py20
1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 38d11fdd0f..63a91f1177 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,11 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams._base import ReceiptsStream
+from synapse.replication.tcp.streams._base import (
+    DeviceListsStream,
+    ReceiptsStream,
+    ToDeviceStream,
+)
 from synapse.server import HomeServer
 from synapse.storage.database import Database
 from synapse.types import ReadReceipt
@@ -256,6 +260,20 @@ class FederationSenderHandler(object):
                 "process_receipts_for_federation", self._on_new_receipts, rows
             )
 
+        # ... as well as device updates and messages
+        elif stream_name == DeviceListsStream.NAME:
+            hosts = set(row.destination for row in rows)
+            for host in hosts:
+                self.federation_sender.send_device_messages(host)
+
+        elif stream_name == ToDeviceStream.NAME:
+            # The to_device stream includes stuff to be pushed to both local
+            # clients and remote servers, so we ignore entities that start with
+            # '@' (since they'll be local users rather than destinations).
+            hosts = set(row.entity for row in rows if not row.entity.startswith("@"))
+            for host in hosts:
+                self.federation_sender.send_device_messages(host)
+
     @defer.inlineCallbacks
     def _on_new_receipts(self, rows):
         """