diff options
author | Patrick Cloke <clokep@users.noreply.github.com> | 2020-12-15 10:41:34 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-15 10:41:34 -0500 |
commit | b3a4b53587108af7c58acc45a0441304689f3ac9 (patch) | |
tree | 6819625f82aac67a79203c16fb52aa13cb72a469 /synapse/push/__init__.py | |
parent | Fix startup failure with localdb_enabled: False (#8937) (diff) | |
download | synapse-b3a4b53587108af7c58acc45a0441304689f3ac9.tar.xz |
Fix handling of stream tokens for push. (#8943)
Removes faulty assertions and fixes the logic to ensure the max stream token is always set.
Diffstat (limited to 'synapse/push/__init__.py')
-rw-r--r-- | synapse/push/__init__.py | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 3d2e874838..ad07ee86f6 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import abc -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict from synapse.types import RoomStreamToken @@ -36,12 +36,21 @@ class Pusher(metaclass=abc.ABCMeta): # This is the highest stream ordering we know it's safe to process. # When new events arrive, we'll be given a window of new events: we # should honour this rather than just looking for anything higher - # because of potential out-of-order event serialisation. This starts - # off as None though as we don't know any better. - self.max_stream_ordering = None # type: Optional[int] + # because of potential out-of-order event serialisation. + self.max_stream_ordering = self.store.get_room_max_stream_ordering() - @abc.abstractmethod def on_new_notifications(self, max_token: RoomStreamToken) -> None: + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + self._start_processing() + + @abc.abstractmethod + def _start_processing(self): + """Start processing push notifications.""" raise NotImplementedError() @abc.abstractmethod |