summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/push/__init__.py2
-rw-r--r--synapse/push/emailpusher.py7
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/push/pusherpool.py10
4 files changed, 7 insertions, 14 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 9e7ac149a1..f4f7ec96f8 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -40,7 +40,7 @@ class PusherConfig:
     ts = attr.ib(type=int)
     lang = attr.ib(type=Optional[str])
     data = attr.ib(type=Optional[JsonDict])
-    last_stream_ordering = attr.ib(type=Optional[int])
+    last_stream_ordering = attr.ib(type=int)
     last_success = attr.ib(type=Optional[int])
     failing_since = attr.ib(type=Optional[int])
 
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d2eff75a58..4ac1b31748 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -157,7 +157,6 @@ class EmailPusher(Pusher):
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        assert start is not None
         unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
             self.user_id, start, self.max_stream_ordering
         )
@@ -220,12 +219,8 @@ class EmailPusher(Pusher):
             )
 
     async def save_last_stream_ordering_and_success(
-        self, last_stream_ordering: Optional[int]
+        self, last_stream_ordering: int
     ) -> None:
-        if last_stream_ordering is None:
-            # This happens if we haven't yet processed anything
-            return
-
         self.last_stream_ordering = last_stream_ordering
         pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
             self.app_id,
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 417fe0f1f5..e048b0d59e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -176,7 +176,6 @@ class HttpPusher(Pusher):
         Never call this directly: use _process which will only allow this to
         run once per pusher.
         """
-        assert self.last_stream_ordering is not None
         unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
@@ -205,7 +204,6 @@ class HttpPusher(Pusher):
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
-                assert self.last_stream_ordering is not None
                 pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
                     self.app_id,
                     self.pushkey,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 8158356d40..eed16dbfb5 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -106,6 +106,10 @@ class PusherPool:
 
         time_now_msec = self.clock.time_msec()
 
+        # create the pusher setting last_stream_ordering to the current maximum
+        # stream ordering, so it will process pushes from this point onwards.
+        last_stream_ordering = self.store.get_room_max_stream_ordering()
+
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
@@ -124,16 +128,12 @@ class PusherPool:
                 ts=time_now_msec,
                 lang=lang,
                 data=data,
-                last_stream_ordering=None,
+                last_stream_ordering=last_stream_ordering,
                 last_success=None,
                 failing_since=None,
             )
         )
 
-        # create the pusher setting last_stream_ordering to the current maximum
-        # stream ordering, so it will process pushes from this point onwards.
-        last_stream_ordering = self.store.get_room_max_stream_ordering()
-
         await self.store.add_pusher(
             user_id=user_id,
             access_token=access_token,