summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-15 12:30:30 +0000
committerGitHub <noreply@github.com>2019-03-15 12:30:30 +0000
commit2dee441bdba458e955a72cd3122272ab55e464ed (patch)
tree869e3705f0886387f67e3b2f13bc5bf0d1745e4c /synapse
parentMerge pull request #4853 from matrix-org/erikj/worker_docker_ci (diff)
parentchangelog (diff)
downloadsynapse-2dee441bdba458e955a72cd3122272ab55e464ed.tar.xz
Merge pull request #4852 from matrix-org/rav/move_rr_sending_to_worker
Move client receipt processing to federation sender worker.
Diffstat (limited to '')
-rw-r--r--synapse/app/federation_sender.py30
-rw-r--r--synapse/federation/send_queue.py9
-rw-r--r--synapse/federation/transaction_queue.py35
-rw-r--r--synapse/handlers/receipts.py79
-rw-r--r--synapse/types.py12
5 files changed, 110 insertions, 55 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.federation import send_queue
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
     """
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
+        self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
         self.replication_client = replication_client
 
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
         elif stream_name == "events":
             self.federation_sender.notify_new_events(token)
 
+        # ... and when new receipts happen
+        elif stream_name == ReceiptsStream.NAME:
+            run_as_background_process(
+                "process_receipts_for_federation", self._on_new_receipts, rows,
+            )
+
+    @defer.inlineCallbacks
+    def _on_new_receipts(self, rows):
+        """
+        Args:
+            rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+                new receipts to be processed
+        """
+        for receipt in rows:
+            # we only want to send on receipts for our own users
+            if not self._is_mine_id(receipt.user_id):
+                continue
+            receipt_info = ReadReceipt(
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                [receipt.event_id],
+                receipt.data,
+            )
+            yield self.federation_sender.send_read_receipt(receipt_info)
+
     @defer.inlineCallbacks
     def update_token(self, token):
         try:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b7d0b25781..bcb41da338 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
+    def send_read_receipt(self, receipt):
+        """As per TransactionQueue
+
+        Args:
+            receipt (synapse.types.ReadReceipt):
+        """
+        # nothing to do here: the replication listener will handle it.
+        pass
+
     def send_presence(self, states):
         """As per TransactionQueue
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index e5e42c647d..288cb5045c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -290,6 +290,41 @@ class TransactionQueue(object):
 
             self._attempt_new_transaction(destination)
 
+    @defer.inlineCallbacks
+    def send_read_receipt(self, receipt):
+        """Send a RR to any other servers in the room
+
+        Args:
+            receipt (synapse.types.ReadReceipt): receipt to be sent
+        """
+        # Work out which remote servers should be poked and poke them.
+        domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
+        domains = [d for d in domains if d != self.server_name]
+        if not domains:
+            return
+
+        logger.debug("Sending receipt to: %r", domains)
+
+        content = {
+            receipt.room_id: {
+                receipt.receipt_type: {
+                    receipt.user_id: {
+                        "event_ids": receipt.event_ids,
+                        "data": receipt.data,
+                    },
+                },
+            },
+        }
+        key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
+
+        for domain in domains:
+            self.build_and_send_edu(
+                destination=domain,
+                edu_type="m.receipt",
+                content=content,
+                key=key,
+            )
+
     @logcontext.preserve_fn  # the caller should not yield on this
     @defer.inlineCallbacks
     def send_presence(self, states):
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 1728089667..dd783ae134 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = [
-            {
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
-                "event_ids": user_values["event_ids"],
-                "data": user_values.get("data", {}),
-            }
+            ReadReceipt(
+                room_id=room_id,
+                receipt_type=receipt_type,
+                user_id=user_id,
+                event_ids=user_values["event_ids"],
+                data=user_values.get("data", {}),
+            )
             for room_id, room_values in content.items()
             for receipt_type, users in room_values.items()
             for user_id, user_values in users.items()
@@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
         max_batch_id = None
 
         for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids, data
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                receipt.event_ids,
+                receipt.data,
             )
 
             if not res:
@@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
         """Called when a client tells us a local user has read up to the given
         event_id in the room.
         """
-        receipt = {
-            "room_id": room_id,
-            "receipt_type": receipt_type,
-            "user_id": user_id,
-            "event_ids": [event_id],
-            "data": {
+        receipt = ReadReceipt(
+            room_id=room_id,
+            receipt_type=receipt_type,
+            user_id=user_id,
+            event_ids=[event_id],
+            data={
                 "ts": int(self.clock.time_msec()),
-            }
-        }
+            },
+        )
 
         is_new = yield self._handle_new_receipts([receipt])
         if not is_new:
             return
 
-        # Work out which remote servers should be poked and poke them.
-
-        # TODO: optimise this to move some of the work to the workers.
-        data = receipt["data"]
-
-        # XXX why does this not use state.get_current_hosts_in_room() ?
-        users = yield self.state.get_current_user_in_room(room_id)
-        remotedomains = set(get_domain_from_id(u) for u in users)
-        remotedomains = remotedomains.copy()
-        remotedomains.discard(self.server_name)
-
-        logger.debug("Sending receipt to: %r", remotedomains)
-
-        for domain in remotedomains:
-            self.federation.build_and_send_edu(
-                destination=domain,
-                edu_type="m.receipt",
-                content={
-                    room_id: {
-                        receipt_type: {
-                            user_id: {
-                                "event_ids": [event_id],
-                                "data": data,
-                            }
-                        }
-                    },
-                },
-                key=(room_id, receipt_type, user_id),
-            )
+        self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/types.py b/synapse/types.py
index d8cb64addb..3de94b6335 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -16,6 +16,8 @@ import re
 import string
 from collections import namedtuple
 
+import attr
+
 from synapse.api.errors import SynapseError
 
 
@@ -455,3 +457,13 @@ class ThirdPartyInstanceID(
     @classmethod
     def create(cls, appservice_id, network_id,):
         return cls(appservice_id=appservice_id, network_id=network_id)
+
+
+@attr.s(slots=True)
+class ReadReceipt(object):
+    """Information about a read-receipt"""
+    room_id = attr.ib()
+    receipt_type = attr.ib()
+    user_id = attr.ib()
+    event_ids = attr.ib()
+    data = attr.ib()