1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 3d2e874838..ad07ee86f6 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -14,7 +14,7 @@
# limitations under the License.
import abc
-from typing import TYPE_CHECKING, Any, Dict, Optional
+from typing import TYPE_CHECKING, Any, Dict
from synapse.types import RoomStreamToken
@@ -36,12 +36,21 @@ class Pusher(metaclass=abc.ABCMeta):
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
# should honour this rather than just looking for anything higher
- # because of potential out-of-order event serialisation. This starts
- # off as None though as we don't know any better.
- self.max_stream_ordering = None # type: Optional[int]
+ # because of potential out-of-order event serialisation.
+ self.max_stream_ordering = self.store.get_room_max_stream_ordering()
- @abc.abstractmethod
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
+ self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ self._start_processing()
+
+ @abc.abstractmethod
+ def _start_processing(self):
+ """Start processing push notifications."""
raise NotImplementedError()
@abc.abstractmethod
|