summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py35
1 files changed, 22 insertions, 13 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py

index 6f5995735a..04d04a4457 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -46,7 +46,7 @@ logger = logging.getLogger(__name__) class FederationRemoteSendQueue(object): - """A drop in replacement for TransactionQueue""" + """A drop in replacement for FederationSender""" def __init__(self, hs): self.server_name = hs.hostname @@ -154,13 +154,17 @@ class FederationRemoteSendQueue(object): del self.device_messages[key] def notify_new_events(self, current_id): - """As per TransactionQueue""" + """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. pass - def send_edu(self, destination, edu_type, content, key=None): - """As per TransactionQueue""" + def build_and_send_edu(self, destination, edu_type, content, key=None): + """As per FederationSender""" + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") + return + pos = self._next_pos() edu = Edu( @@ -179,8 +183,17 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() + def send_read_receipt(self, receipt): + """As per FederationSender + + Args: + receipt (synapse.types.ReadReceipt): + """ + # nothing to do here: the replication listener will handle it. + pass + def send_presence(self, states): - """As per TransactionQueue + """As per FederationSender Args: states (list(UserPresenceState)) @@ -197,7 +210,7 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() def send_device_messages(self, destination): - """As per TransactionQueue""" + """As per FederationSender""" pos = self._next_pos() self.device_messages[pos] = destination self.notifier.on_new_replication_data() @@ -435,7 +448,7 @@ def process_rows_for_federation(transaction_queue, rows): transaction queue ready for sending to the relevant homeservers. Args: - transaction_queue (TransactionQueue) + transaction_queue (FederationSender) rows (list(synapse.replication.tcp.streams.FederationStreamRow)) """ @@ -465,15 +478,11 @@ def process_rows_for_federation(transaction_queue, rows): for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): - transaction_queue.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) + transaction_queue.send_edu(edu, key) for destination, edu_list in iteritems(buff.edus): for edu in edu_list: - transaction_queue.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) + transaction_queue.send_edu(edu, None) for destination in buff.device_destinations: transaction_queue.send_device_messages(destination)