1 files changed, 10 insertions, 15 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a7a29b758b..69ac468f75 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -130,11 +130,10 @@ class ReceiptsHandler:
async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
"""Takes a list of receipts, stores them and informs the notifier."""
- min_batch_id: Optional[int] = None
- max_batch_id: Optional[int] = None
+ receipts_persisted: List[ReadReceipt] = []
for receipt in receipts:
- res = await self.store.insert_receipt(
+ stream_id = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
@@ -143,30 +142,26 @@ class ReceiptsHandler:
receipt.data,
)
- if not res:
- # res will be None if this receipt is 'old'
+ if stream_id is None:
+ # stream_id will be None if this receipt is 'old'
continue
- stream_id, max_persisted_id = res
+ receipts_persisted.append(receipt)
- if min_batch_id is None or stream_id < min_batch_id:
- min_batch_id = stream_id
- if max_batch_id is None or max_persisted_id > max_batch_id:
- max_batch_id = max_persisted_id
-
- # Either both of these should be None or neither.
- if min_batch_id is None or max_batch_id is None:
+ if not receipts_persisted:
# no new receipts
return False
- affected_room_ids = list({r.room_id for r in receipts})
+ max_batch_id = self.store.get_max_receipt_stream_id()
+
+ affected_room_ids = list({r.room_id for r in receipts_persisted})
self.notifier.on_new_event(
StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
await self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
+ {r.user_id for r in receipts_persisted}
)
return True
|