summary refs log tree commit diff
path: root/synapse/push/__init__.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-12-15 10:41:34 -0500
committerGitHub <noreply@github.com>2020-12-15 10:41:34 -0500
commitb3a4b53587108af7c58acc45a0441304689f3ac9 (patch)
tree6819625f82aac67a79203c16fb52aa13cb72a469 /synapse/push/__init__.py
parentFix startup failure with localdb_enabled: False (#8937) (diff)
downloadsynapse-b3a4b53587108af7c58acc45a0441304689f3ac9.tar.xz
Fix handling of stream tokens for push. (#8943)
Removes faulty assertions and fixes the logic to ensure the max
stream token is always set.
Diffstat (limited to 'synapse/push/__init__.py')
-rw-r--r--synapse/push/__init__.py19
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 3d2e874838..ad07ee86f6 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import abc
-from typing import TYPE_CHECKING, Any, Dict, Optional
+from typing import TYPE_CHECKING, Any, Dict
 
 from synapse.types import RoomStreamToken
 
@@ -36,12 +36,21 @@ class Pusher(metaclass=abc.ABCMeta):
         # This is the highest stream ordering we know it's safe to process.
         # When new events arrive, we'll be given a window of new events: we
         # should honour this rather than just looking for anything higher
-        # because of potential out-of-order event serialisation. This starts
-        # off as None though as we don't know any better.
-        self.max_stream_ordering = None  # type: Optional[int]
+        # because of potential out-of-order event serialisation.
+        self.max_stream_ordering = self.store.get_room_max_stream_ordering()
 
-    @abc.abstractmethod
     def on_new_notifications(self, max_token: RoomStreamToken) -> None:
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
+        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+        self._start_processing()
+
+    @abc.abstractmethod
+    def _start_processing(self):
+        """Start processing push notifications."""
         raise NotImplementedError()
 
     @abc.abstractmethod