summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-13 15:55:37 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-13 17:21:19 +0000
commitfdcad8eabdf20718d053255555fb27ac190613c4 (patch)
treef4bcd043bcfaf2dac0b2f578f5920f84ce84dcd7 /synapse/app
parentdeclare a ReadReceipt class (diff)
downloadsynapse-fdcad8eabdf20718d053255555fb27ac190613c4.tar.xz
Move client receipt processing to federation sender worker.
This is mostly a prerequisite for #4730, but also fits with the general theme
of "move everything off the master that we possibly can".
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/federation_sender.py30
1 files changed, 30 insertions, 0 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.federation import send_queue
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
     """
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
+        self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
         self.replication_client = replication_client
 
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
         elif stream_name == "events":
             self.federation_sender.notify_new_events(token)
 
+        # ... and when new receipts happen
+        elif stream_name == ReceiptsStream.NAME:
+            run_as_background_process(
+                "process_receipts_for_federation", self._on_new_receipts, rows,
+            )
+
+    @defer.inlineCallbacks
+    def _on_new_receipts(self, rows):
+        """
+        Args:
+            rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+                new receipts to be processed
+        """
+        for receipt in rows:
+            # we only want to send on receipts for our own users
+            if not self._is_mine_id(receipt.user_id):
+                continue
+            receipt_info = ReadReceipt(
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                [receipt.event_id],
+                receipt.data,
+            )
+            yield self.federation_sender.send_read_receipt(receipt_info)
+
     @defer.inlineCallbacks
     def update_token(self, token):
         try: