diff options
-rw-r--r-- | changelog.d/6799.bugfix | 1 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 20 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 32 |
3 files changed, 23 insertions, 30 deletions
diff --git a/changelog.d/6799.bugfix b/changelog.d/6799.bugfix new file mode 100644 index 0000000000..322a2758af --- /dev/null +++ b/changelog.d/6799.bugfix @@ -0,0 +1 @@ +Fix race in federation sender worker that delayed sending of device updates. diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 38d11fdd0f..63a91f1177 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,11 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams._base import ReceiptsStream +from synapse.replication.tcp.streams._base import ( + DeviceListsStream, + ReceiptsStream, + ToDeviceStream, +) from synapse.server import HomeServer from synapse.storage.database import Database from synapse.types import ReadReceipt @@ -256,6 +260,20 @@ class FederationSenderHandler(object): "process_receipts_for_federation", self._on_new_receipts, rows ) + # ... as well as device updates and messages + elif stream_name == DeviceListsStream.NAME: + hosts = set(row.destination for row in rows) + for host in hosts: + self.federation_sender.send_device_messages(host) + + elif stream_name == ToDeviceStream.NAME: + # The to_device stream includes stuff to be pushed to both local + # clients and remote servers, so we ignore entities that start with + # '@' (since they'll be local users rather than destinations). + hosts = set(row.entity for row in rows if not row.entity.startswith("@")) + for host in hosts: + self.federation_sender.send_device_messages(host) + @defer.inlineCallbacks def _on_new_receipts(self, rows): """ diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 174f6e42be..0bb82a6bb3 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object): self.edus = SortedDict() # stream position -> Edu - self.device_messages = SortedDict() # stream position -> destination - self.pos = 1 self.pos_time = SortedDict() @@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object): "keyed_edu", "keyed_edu_changed", "edus", - "device_messages", "pos_time", "presence_destinations", ]: @@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.edus[key] - # Delete things out of device map - keys = self.device_messages.keys() - i = self.device_messages.bisect_left(position_to_delete) - for key in keys[:i]: - del self.device_messages[key] - def notify_new_events(self, current_id): """As per FederationSender""" # We don't need to replicate this as it gets sent down a different @@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object): def send_device_messages(self, destination): """As per FederationSender""" - pos = self._next_pos() - self.device_messages[pos] = destination - self.notifier.on_new_replication_data() + # We don't need to replicate this as it gets sent down a different + # stream. def get_current_token(self): return self.pos - 1 @@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object): for (pos, edu) in edus: rows.append((pos, EduRow(edu))) - # Fetch changed device messages - i = self.device_messages.bisect_right(from_token) - j = self.device_messages.bisect_right(to_token) + 1 - device_messages = {v: k for k, v in self.device_messages.items()[i:j]} - - for (destination, pos) in iteritems(device_messages): - rows.append((pos, DeviceRow(destination=destination))) - # Sort rows based on pos rows.sort() @@ -504,7 +486,6 @@ ParsedFederationStreamData = namedtuple( "presence_destinations", # list of tuples of UserPresenceState and destinations "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] - "device_destinations", # set of destinations ), ) @@ -523,11 +504,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence=[], - presence_destinations=[], - keyed_edus={}, - edus={}, - device_destinations=set(), + presence=[], presence_destinations=[], keyed_edus={}, edus={}, ) # Parse the rows in the stream and add to the buffer @@ -555,6 +532,3 @@ def process_rows_for_federation(transaction_queue, rows): for destination, edu_list in iteritems(buff.edus): for edu in edu_list: transaction_queue.send_edu(edu, None) - - for destination in buff.device_destinations: - transaction_queue.send_device_messages(destination) |