diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 28bd8ab748..b7ea4438e0 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -91,7 +91,7 @@ class EmailPusher:
pass
self.timed_call = None
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 26706bf3e1..f21fa9b659 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -114,7 +114,7 @@ class HttpPusher:
if should_check_for_notifs:
self._start_processing()
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index fa8473bf8d..3c3262a88c 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -64,12 +64,6 @@ class PusherPool:
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()
- # Record the last stream ID that we were poked about so we can get
- # changes since then. We set this to the current max stream ID on
- # startup as every individual pusher will have checked for changes on
- # startup.
- self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()
-
# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
@@ -184,27 +178,20 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_stream_id):
+ async def on_new_notifications(self, min_stream_id, max_stream_id):
if not self.pushers:
# nothing to do here.
return
- if max_stream_id < self._last_room_stream_id_seen:
- # Nothing to do
- return
-
- prev_stream_id = self._last_room_stream_id_seen
- self._last_room_stream_id_seen = max_stream_id
-
try:
users_affected = await self.store.get_push_action_users_in_range(
- prev_stream_id, max_stream_id
+ min_stream_id, max_stream_id
)
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- p.on_new_notifications(max_stream_id)
+ p.on_new_notifications(min_stream_id, max_stream_id)
except Exception:
logger.exception("Exception in pusher on_new_notifications")
|