diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 6f5995735a..04d04a4457 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -46,7 +46,7 @@ logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(object):
- """A drop in replacement for TransactionQueue"""
+ """A drop in replacement for FederationSender"""
def __init__(self, hs):
self.server_name = hs.hostname
@@ -154,13 +154,17 @@ class FederationRemoteSendQueue(object):
del self.device_messages[key]
def notify_new_events(self, current_id):
- """As per TransactionQueue"""
+ """As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
- def send_edu(self, destination, edu_type, content, key=None):
- """As per TransactionQueue"""
+ def build_and_send_edu(self, destination, edu_type, content, key=None):
+ """As per FederationSender"""
+ if destination == self.server_name:
+ logger.info("Not sending EDU to ourselves")
+ return
+
pos = self._next_pos()
edu = Edu(
@@ -179,8 +183,17 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
+ def send_read_receipt(self, receipt):
+ """As per FederationSender
+
+ Args:
+ receipt (synapse.types.ReadReceipt):
+ """
+ # nothing to do here: the replication listener will handle it.
+ pass
+
def send_presence(self, states):
- """As per TransactionQueue
+ """As per FederationSender
Args:
states (list(UserPresenceState))
@@ -197,7 +210,7 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
- """As per TransactionQueue"""
+ """As per FederationSender"""
pos = self._next_pos()
self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
@@ -435,7 +448,7 @@ def process_rows_for_federation(transaction_queue, rows):
transaction queue ready for sending to the relevant homeservers.
Args:
- transaction_queue (TransactionQueue)
+ transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
"""
@@ -465,15 +478,11 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
- transaction_queue.send_edu(
- edu.destination, edu.edu_type, edu.content, key=key,
- )
+ transaction_queue.send_edu(edu, key)
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
- transaction_queue.send_edu(
- edu.destination, edu.edu_type, edu.content, key=None,
- )
+ transaction_queue.send_edu(edu, None)
for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination)
|