summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-15 12:30:30 +0000
committerGitHub <noreply@github.com>2019-03-15 12:30:30 +0000
commit2dee441bdba458e955a72cd3122272ab55e464ed (patch)
tree869e3705f0886387f67e3b2f13bc5bf0d1745e4c /synapse/handlers
parentMerge pull request #4853 from matrix-org/erikj/worker_docker_ci (diff)
parentchangelog (diff)
downloadsynapse-2dee441bdba458e955a72cd3122272ab55e464ed.tar.xz
Merge pull request #4852 from matrix-org/rav/move_rr_sending_to_worker
Move client receipt processing to federation sender worker.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/receipts.py79
1 files changed, 24 insertions, 55 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 1728089667..dd783ae134 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = [
-            {
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
-                "event_ids": user_values["event_ids"],
-                "data": user_values.get("data", {}),
-            }
+            ReadReceipt(
+                room_id=room_id,
+                receipt_type=receipt_type,
+                user_id=user_id,
+                event_ids=user_values["event_ids"],
+                data=user_values.get("data", {}),
+            )
             for room_id, room_values in content.items()
             for receipt_type, users in room_values.items()
             for user_id, user_values in users.items()
@@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
         max_batch_id = None
 
         for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids, data
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                receipt.event_ids,
+                receipt.data,
             )
 
             if not res:
@@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
         """Called when a client tells us a local user has read up to the given
         event_id in the room.
         """
-        receipt = {
-            "room_id": room_id,
-            "receipt_type": receipt_type,
-            "user_id": user_id,
-            "event_ids": [event_id],
-            "data": {
+        receipt = ReadReceipt(
+            room_id=room_id,
+            receipt_type=receipt_type,
+            user_id=user_id,
+            event_ids=[event_id],
+            data={
                 "ts": int(self.clock.time_msec()),
-            }
-        }
+            },
+        )
 
         is_new = yield self._handle_new_receipts([receipt])
         if not is_new:
             return
 
-        # Work out which remote servers should be poked and poke them.
-
-        # TODO: optimise this to move some of the work to the workers.
-        data = receipt["data"]
-
-        # XXX why does this not use state.get_current_hosts_in_room() ?
-        users = yield self.state.get_current_user_in_room(room_id)
-        remotedomains = set(get_domain_from_id(u) for u in users)
-        remotedomains = remotedomains.copy()
-        remotedomains.discard(self.server_name)
-
-        logger.debug("Sending receipt to: %r", remotedomains)
-
-        for domain in remotedomains:
-            self.federation.build_and_send_edu(
-                destination=domain,
-                edu_type="m.receipt",
-                content={
-                    room_id: {
-                        receipt_type: {
-                            user_id: {
-                                "event_ids": [event_id],
-                                "data": data,
-                            }
-                        }
-                    },
-                },
-                key=(room_id, receipt_type, user_id),
-            )
+        self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):