summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-29 11:23:01 +0000
committerGitHub <noreply@github.com>2020-01-29 11:23:01 +0000
commit6b9e1014cf9c107f3198999159fbc935376fdcc9 (patch)
treed2807e44b6797d6cc44f8a1c8992d060b8a4b104 /synapse/federation
parentDelete current state when server leaves a room (#6792) (diff)
downloadsynapse-6b9e1014cf9c107f3198999159fbc935376fdcc9.tar.xz
Fix race in federation sender that delayed device updates. (#6799)
We were sending device updates down both the federation stream and
device streams. This mean there was a race if the federation sender
worker processed the federation stream first, as when the sender checked
if there were new device updates the slaved ID generator hadn't been
updated with the new stream IDs and so returned nothing.

This situation is correctly handled by events/receipts/etc by not
sending updates down the federation stream and instead having the
federation sender worker listen on the other streams and poke the
transaction queues as appropriate.
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py32
1 files changed, 3 insertions, 29 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 174f6e42be..0bb82a6bb3 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.device_messages = SortedDict()  # stream position -> destination
-
         self.pos = 1
         self.pos_time = SortedDict()
 
@@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object):
             "keyed_edu",
             "keyed_edu_changed",
             "edus",
-            "device_messages",
             "pos_time",
             "presence_destinations",
         ]:
@@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of device map
-            keys = self.device_messages.keys()
-            i = self.device_messages.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.device_messages[key]
-
     def notify_new_events(self, current_id):
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
@@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object):
 
     def send_device_messages(self, destination):
         """As per FederationSender"""
-        pos = self._next_pos()
-        self.device_messages[pos] = destination
-        self.notifier.on_new_replication_data()
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
 
     def get_current_token(self):
         return self.pos - 1
@@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed device messages
-        i = self.device_messages.bisect_right(from_token)
-        j = self.device_messages.bisect_right(to_token) + 1
-        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
-        for (destination, pos) in iteritems(device_messages):
-            rows.append((pos, DeviceRow(destination=destination)))
-
         # Sort rows based on pos
         rows.sort()
 
@@ -504,7 +486,6 @@ ParsedFederationStreamData = namedtuple(
         "presence_destinations",  # list of tuples of UserPresenceState and destinations
         "keyed_edus",  # dict of destination -> { key -> Edu }
         "edus",  # dict of destination -> [Edu]
-        "device_destinations",  # set of destinations
     ),
 )
 
@@ -523,11 +504,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence=[],
-        presence_destinations=[],
-        keyed_edus={},
-        edus={},
-        device_destinations=set(),
+        presence=[], presence_destinations=[], keyed_edus={}, edus={},
     )
 
     # Parse the rows in the stream and add to the buffer
@@ -555,6 +532,3 @@ def process_rows_for_federation(transaction_queue, rows):
     for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
-
-    for destination in buff.device_destinations:
-        transaction_queue.send_device_messages(destination)