summary refs log tree commit diff
path: root/synapse/handlers/receipts.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/receipts.py')
-rw-r--r--synapse/handlers/receipts.py114
1 files changed, 34 insertions, 80 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 696469732c..274d2946ad 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,10 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -39,42 +37,17 @@ class ReceiptsHandler(BaseHandler):
         self.state = hs.get_state_handler()
 
     @defer.inlineCallbacks
-    def received_client_receipt(self, room_id, receipt_type, user_id,
-                                event_id):
-        """Called when a client tells us a local user has read up to the given
-        event_id in the room.
-        """
-        receipt = {
-            "room_id": room_id,
-            "receipt_type": receipt_type,
-            "user_id": user_id,
-            "event_ids": [event_id],
-            "data": {
-                "ts": int(self.clock.time_msec()),
-            }
-        }
-
-        is_new = yield self._handle_new_receipts([receipt])
-
-        if is_new:
-            # fire off a process in the background to send the receipt to
-            # remote servers
-            run_as_background_process(
-                'push_receipts_to_remotes', self._push_remotes, receipt
-            )
-
-    @defer.inlineCallbacks
     def _received_remote_receipt(self, origin, content):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = [
-            {
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
-                "event_ids": user_values["event_ids"],
-                "data": user_values.get("data", {}),
-            }
+            ReadReceipt(
+                room_id=room_id,
+                receipt_type=receipt_type,
+                user_id=user_id,
+                event_ids=user_values["event_ids"],
+                data=user_values.get("data", {}),
+            )
             for room_id, room_values in content.items()
             for receipt_type, users in room_values.items()
             for user_id, user_values in users.items()
@@ -90,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
         max_batch_id = None
 
         for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids, data
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                receipt.event_ids,
+                receipt.data,
             )
 
             if not res:
@@ -115,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -128,43 +99,26 @@ class ReceiptsHandler(BaseHandler):
         defer.returnValue(True)
 
     @defer.inlineCallbacks
-    def _push_remotes(self, receipt):
-        """Given a receipt, works out which remote servers should be
-        poked and pokes them.
+    def received_client_receipt(self, room_id, receipt_type, user_id,
+                                event_id):
+        """Called when a client tells us a local user has read up to the given
+        event_id in the room.
         """
-        try:
-            # TODO: optimise this to move some of the work to the workers.
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
-            users = yield self.state.get_current_user_in_room(room_id)
-            remotedomains = set(get_domain_from_id(u) for u in users)
-            remotedomains = remotedomains.copy()
-            remotedomains.discard(self.server_name)
-
-            logger.debug("Sending receipt to: %r", remotedomains)
-
-            for domain in remotedomains:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.receipt",
-                    content={
-                        room_id: {
-                            receipt_type: {
-                                user_id: {
-                                    "event_ids": event_ids,
-                                    "data": data,
-                                }
-                            }
-                        },
-                    },
-                    key=(room_id, receipt_type, user_id),
-                )
-        except Exception:
-            logger.exception("Error pushing receipts to remote servers")
+        receipt = ReadReceipt(
+            room_id=room_id,
+            receipt_type=receipt_type,
+            user_id=user_id,
+            event_ids=[event_id],
+            data={
+                "ts": int(self.clock.time_msec()),
+            },
+        )
+
+        is_new = yield self._handle_new_receipts([receipt])
+        if not is_new:
+            return
+
+        yield self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):