summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py19
-rw-r--r--synapse/push/emailpusher.py16
-rw-r--r--synapse/push/httppusher.py17
-rw-r--r--synapse/push/pusherpool.py5
4 files changed, 17 insertions, 40 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 3d2e874838..ad07ee86f6 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import abc
-from typing import TYPE_CHECKING, Any, Dict, Optional
+from typing import TYPE_CHECKING, Any, Dict
 
 from synapse.types import RoomStreamToken
 
@@ -36,12 +36,21 @@ class Pusher(metaclass=abc.ABCMeta):
         # This is the highest stream ordering we know it's safe to process.
         # When new events arrive, we'll be given a window of new events: we
         # should honour this rather than just looking for anything higher
-        # because of potential out-of-order event serialisation. This starts
-        # off as None though as we don't know any better.
-        self.max_stream_ordering = None  # type: Optional[int]
+        # because of potential out-of-order event serialisation.
+        self.max_stream_ordering = self.store.get_room_max_stream_ordering()
 
-    @abc.abstractmethod
     def on_new_notifications(self, max_token: RoomStreamToken) -> None:
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
+        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+        self._start_processing()
+
+    @abc.abstractmethod
+    def _start_processing(self):
+        """Start processing push notifications."""
         raise NotImplementedError()
 
     @abc.abstractmethod
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 64a35c1994..11a97b8df4 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -22,7 +22,6 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import Pusher
 from synapse.push.mailer import Mailer
-from synapse.types import RoomStreamToken
 
 if TYPE_CHECKING:
     from synapse.app.homeserver import HomeServer
@@ -93,20 +92,6 @@ class EmailPusher(Pusher):
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, max_token: RoomStreamToken) -> None:
-        # We just use the minimum stream ordering and ignore the vector clock
-        # component. This is safe to do as long as we *always* ignore the vector
-        # clock components.
-        max_stream_ordering = max_token.stream
-
-        if self.max_stream_ordering:
-            self.max_stream_ordering = max(
-                max_stream_ordering, self.max_stream_ordering
-            )
-        else:
-            self.max_stream_ordering = max_stream_ordering
-        self._start_processing()
-
     def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
         # We could wake up and cancel the timer but there tend to be quite a
         # lot of read receipts so it's probably less work to just let the
@@ -172,7 +157,6 @@ class EmailPusher(Pusher):
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        assert self.max_stream_ordering is not None
         unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
             self.user_id, start, self.max_stream_ordering
         )
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 5408aa1295..e8b25bcd2a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -26,7 +26,6 @@ from synapse.events import EventBase
 from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import Pusher, PusherConfigException
-from synapse.types import RoomStreamToken
 
 from . import push_rule_evaluator, push_tools
 
@@ -122,17 +121,6 @@ class HttpPusher(Pusher):
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, max_token: RoomStreamToken) -> None:
-        # We just use the minimum stream ordering and ignore the vector clock
-        # component. This is safe to do as long as we *always* ignore the vector
-        # clock components.
-        max_stream_ordering = max_token.stream
-
-        self.max_stream_ordering = max(
-            max_stream_ordering, self.max_stream_ordering or 0
-        )
-        self._start_processing()
-
     def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
         # Note that the min here shouldn't be relied upon to be accurate.
 
@@ -192,10 +180,7 @@ class HttpPusher(Pusher):
         Never call this directly: use _process which will only allow this to
         run once per pusher.
         """
-
-        fn = self.store.get_unread_push_actions_for_user_in_range_for_http
-        assert self.max_stream_ordering is not None
-        unprocessed = await fn(
+        unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 9fcc0b8a64..9c12d81cfb 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -129,9 +129,8 @@ class PusherPool:
         )
 
         # create the pusher setting last_stream_ordering to the current maximum
-        # stream ordering in event_push_actions, so it will process
-        # pushes from this point onwards.
-        last_stream_ordering = await self.store.get_latest_push_action_stream_ordering()
+        # stream ordering, so it will process pushes from this point onwards.
+        last_stream_ordering = self.store.get_room_max_stream_ordering()
 
         await self.store.add_pusher(
             user_id=user_id,