summary refs log tree commit diff
path: root/synapse/handlers/receipts.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/receipts.py')
-rw-r--r--synapse/handlers/receipts.py33
1 files changed, 23 insertions, 10 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 935c339707..e62722d78d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler):
     def __init__(self, hs):
         super(ReceiptsHandler, self).__init__(hs)
 
+        self.server_name = hs.config.server_name
+        self.store = hs.get_datastore()
         self.hs = hs
         self.federation = hs.get_replication_layer()
         self.federation.register_edu_handler(
@@ -80,6 +82,9 @@ class ReceiptsHandler(BaseHandler):
     def _handle_new_receipts(self, receipts):
         """Takes a list of receipts, stores them and informs the notifier.
         """
+        min_batch_id = None
+        max_batch_id = None
+
         for receipt in receipts:
             room_id = receipt["room_id"]
             receipt_type = receipt["receipt_type"]
@@ -97,10 +102,21 @@ class ReceiptsHandler(BaseHandler):
 
             stream_id, max_persisted_id = res
 
-            with PreserveLoggingContext():
-                self.notifier.on_new_event(
-                    "receipt_key", max_persisted_id, rooms=[room_id]
-                )
+            if min_batch_id is None or stream_id < min_batch_id:
+                min_batch_id = stream_id
+            if max_batch_id is None or max_persisted_id > max_batch_id:
+                max_batch_id = max_persisted_id
+
+        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+
+        with PreserveLoggingContext():
+            self.notifier.on_new_event(
+                "receipt_key", max_batch_id, rooms=affected_room_ids
+            )
+            # Note that the min here shouldn't be relied upon to be accurate.
+            self.hs.get_pusherpool().on_new_receipts(
+                min_batch_id, max_batch_id, affected_room_ids
+            )
 
             defer.returnValue(True)
 
@@ -117,12 +133,9 @@ class ReceiptsHandler(BaseHandler):
             event_ids = receipt["event_ids"]
             data = receipt["data"]
 
-            remotedomains = set()
-
-            rm_handler = self.hs.get_handlers().room_member_handler
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=None, remotedomains=remotedomains
-            )
+            remotedomains = yield self.store.get_joined_hosts_for_room(room_id)
+            remotedomains = remotedomains.copy()
+            remotedomains.discard(self.server_name)
 
             logger.debug("Sending receipt to: %r", remotedomains)