summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-15 12:30:30 +0000
committerGitHub <noreply@github.com>2019-03-15 12:30:30 +0000
commit2dee441bdba458e955a72cd3122272ab55e464ed (patch)
tree869e3705f0886387f67e3b2f13bc5bf0d1745e4c /synapse/federation
parentMerge pull request #4853 from matrix-org/erikj/worker_docker_ci (diff)
parentchangelog (diff)
downloadsynapse-2dee441bdba458e955a72cd3122272ab55e464ed.tar.xz
Merge pull request #4852 from matrix-org/rav/move_rr_sending_to_worker
Move client receipt processing to federation sender worker.
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py9
-rw-r--r--synapse/federation/transaction_queue.py35
2 files changed, 44 insertions, 0 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b7d0b25781..bcb41da338 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
+    def send_read_receipt(self, receipt):
+        """As per TransactionQueue
+
+        Args:
+            receipt (synapse.types.ReadReceipt):
+        """
+        # nothing to do here: the replication listener will handle it.
+        pass
+
     def send_presence(self, states):
         """As per TransactionQueue
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index e5e42c647d..288cb5045c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -290,6 +290,41 @@ class TransactionQueue(object):
 
             self._attempt_new_transaction(destination)
 
+    @defer.inlineCallbacks
+    def send_read_receipt(self, receipt):
+        """Send a RR to any other servers in the room
+
+        Args:
+            receipt (synapse.types.ReadReceipt): receipt to be sent
+        """
+        # Work out which remote servers should be poked and poke them.
+        domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
+        domains = [d for d in domains if d != self.server_name]
+        if not domains:
+            return
+
+        logger.debug("Sending receipt to: %r", domains)
+
+        content = {
+            receipt.room_id: {
+                receipt.receipt_type: {
+                    receipt.user_id: {
+                        "event_ids": receipt.event_ids,
+                        "data": receipt.data,
+                    },
+                },
+            },
+        }
+        key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
+
+        for domain in domains:
+            self.build_and_send_edu(
+                destination=domain,
+                edu_type="m.receipt",
+                content=content,
+                key=key,
+            )
+
     @logcontext.preserve_fn  # the caller should not yield on this
     @defer.inlineCallbacks
     def send_presence(self, states):