diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7de7bd3289..c200a45f3a 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -216,7 +216,7 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
- stream_key: str,
+ stream_key: StreamKeyType,
new_token: Union[int, RoomStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
@@ -326,7 +326,7 @@ class ApplicationServicesHandler:
async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
- stream_key: str,
+ stream_key: StreamKeyType,
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 7ed88a3611..87b428ab1c 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.push_rule import RuleNotFoundException
from synapse.synapse_rust.push import get_base_rule_ids
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -114,7 +114,9 @@ class PushRulesHandler:
user_id: the user ID the change is for.
"""
stream_id = self._main_store.get_max_push_rules_stream_id()
- self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
+ self._notifier.on_new_event(
+ StreamKeyType.PUSH_RULES, stream_id, users=[user_id]
+ )
async def push_rules_for_user(
self, user: UserID
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a7a29b758b..69ac468f75 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -130,11 +130,10 @@ class ReceiptsHandler:
async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
"""Takes a list of receipts, stores them and informs the notifier."""
- min_batch_id: Optional[int] = None
- max_batch_id: Optional[int] = None
+ receipts_persisted: List[ReadReceipt] = []
for receipt in receipts:
- res = await self.store.insert_receipt(
+ stream_id = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
@@ -143,30 +142,26 @@ class ReceiptsHandler:
receipt.data,
)
- if not res:
- # res will be None if this receipt is 'old'
+ if stream_id is None:
+ # stream_id will be None if this receipt is 'old'
continue
- stream_id, max_persisted_id = res
+ receipts_persisted.append(receipt)
- if min_batch_id is None or stream_id < min_batch_id:
- min_batch_id = stream_id
- if max_batch_id is None or max_persisted_id > max_batch_id:
- max_batch_id = max_persisted_id
-
- # Either both of these should be None or neither.
- if min_batch_id is None or max_batch_id is None:
+ if not receipts_persisted:
# no new receipts
return False
- affected_room_ids = list({r.room_id for r in receipts})
+ max_batch_id = self.store.get_max_receipt_stream_id()
+
+ affected_room_ids = list({r.room_id for r in receipts_persisted})
self.notifier.on_new_event(
StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
await self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
+ {r.user_id for r in receipts_persisted}
)
return True
|