summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/federation_sender.py30
-rw-r--r--synapse/federation/send_queue.py9
-rw-r--r--synapse/federation/transaction_queue.py35
-rw-r--r--synapse/handlers/receipts.py37
4 files changed, 78 insertions, 33 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.federation import send_queue
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
     """
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
+        self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
         self.replication_client = replication_client
 
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
         elif stream_name == "events":
             self.federation_sender.notify_new_events(token)
 
+        # ... and when new receipts happen
+        elif stream_name == ReceiptsStream.NAME:
+            run_as_background_process(
+                "process_receipts_for_federation", self._on_new_receipts, rows,
+            )
+
+    @defer.inlineCallbacks
+    def _on_new_receipts(self, rows):
+        """
+        Args:
+            rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+                new receipts to be processed
+        """
+        for receipt in rows:
+            # we only want to send on receipts for our own users
+            if not self._is_mine_id(receipt.user_id):
+                continue
+            receipt_info = ReadReceipt(
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                [receipt.event_id],
+                receipt.data,
+            )
+            yield self.federation_sender.send_read_receipt(receipt_info)
+
     @defer.inlineCallbacks
     def update_token(self, token):
         try:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b7d0b25781..bcb41da338 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
+    def send_read_receipt(self, receipt):
+        """As per TransactionQueue
+
+        Args:
+            receipt (synapse.types.ReadReceipt):
+        """
+        # nothing to do here: the replication listener will handle it.
+        pass
+
     def send_presence(self, states):
         """As per TransactionQueue
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index e5e42c647d..288cb5045c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -290,6 +290,41 @@ class TransactionQueue(object):
 
             self._attempt_new_transaction(destination)
 
+    @defer.inlineCallbacks
+    def send_read_receipt(self, receipt):
+        """Send a RR to any other servers in the room
+
+        Args:
+            receipt (synapse.types.ReadReceipt): receipt to be sent
+        """
+        # Work out which remote servers should be poked and poke them.
+        domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
+        domains = [d for d in domains if d != self.server_name]
+        if not domains:
+            return
+
+        logger.debug("Sending receipt to: %r", domains)
+
+        content = {
+            receipt.room_id: {
+                receipt.receipt_type: {
+                    receipt.user_id: {
+                        "event_ids": receipt.event_ids,
+                        "data": receipt.data,
+                    },
+                },
+            },
+        }
+        key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
+
+        for domain in domains:
+            self.build_and_send_edu(
+                destination=domain,
+                edu_type="m.receipt",
+                content=content,
+                key=key,
+            )
+
     @logcontext.preserve_fn  # the caller should not yield on this
     @defer.inlineCallbacks
     def send_presence(self, states):
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 733e7c3752..dd783ae134 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import ReadReceipt, get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -87,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -119,35 +118,7 @@ class ReceiptsHandler(BaseHandler):
         if not is_new:
             return
 
-        # Work out which remote servers should be poked and poke them.
-
-        # TODO: optimise this to move some of the work to the workers.
-        data = receipt.data
-
-        # XXX why does this not use state.get_current_hosts_in_room() ?
-        users = yield self.state.get_current_user_in_room(room_id)
-        remotedomains = set(get_domain_from_id(u) for u in users)
-        remotedomains = remotedomains.copy()
-        remotedomains.discard(self.server_name)
-
-        logger.debug("Sending receipt to: %r", remotedomains)
-
-        for domain in remotedomains:
-            self.federation.build_and_send_edu(
-                destination=domain,
-                edu_type="m.receipt",
-                content={
-                    room_id: {
-                        receipt_type: {
-                            user_id: {
-                                "event_ids": [event_id],
-                                "data": data,
-                            }
-                        }
-                    },
-                },
-                key=(room_id, receipt_type, user_id),
-            )
+        self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):