diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 88d8b9ba54..aaf6b1b837 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
import logging
@@ -406,6 +406,12 @@ class BaseHandler(object):
event, context=context
)
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
destinations = set()
for k, s in context.current_state.items():
try:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eb02f0e000..3686738e59 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,7 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@@ -1097,6 +1097,12 @@ class FederationHandler(BaseHandler):
context=context,
)
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 935c339707..26b0368080 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler):
def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
+ min_batch_id = None
+ max_batch_id = None
+
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
@@ -97,10 +100,20 @@ class ReceiptsHandler(BaseHandler):
stream_id, max_persisted_id = res
- with PreserveLoggingContext():
- self.notifier.on_new_event(
- "receipt_key", max_persisted_id, rooms=[room_id]
- )
+ if min_batch_id is None or stream_id < min_batch_id:
+ min_batch_id = stream_id
+ if max_batch_id is None or max_persisted_id > max_batch_id:
+ max_batch_id = max_persisted_id
+
+ affected_room_ids = list(set([r["room_id"] for r in receipts]))
+
+ with PreserveLoggingContext():
+ self.notifier.on_new_event(
+ "receipt_key", max_batch_id, rooms=affected_room_ids
+ )
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids
+ )
defer.returnValue(True)
|