From b3a4b53587108af7c58acc45a0441304689f3ac9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Dec 2020 10:41:34 -0500 Subject: Fix handling of stream tokens for push. (#8943) Removes faulty assertions and fixes the logic to ensure the max stream token is always set. --- changelog.d/8943.misc | 1 + synapse/push/__init__.py | 19 ++++++++++++++----- synapse/push/emailpusher.py | 16 ---------------- synapse/push/httppusher.py | 17 +---------------- synapse/push/pusherpool.py | 5 ++--- synapse/storage/databases/main/event_push_actions.py | 10 ---------- 6 files changed, 18 insertions(+), 50 deletions(-) create mode 100644 changelog.d/8943.misc diff --git a/changelog.d/8943.misc b/changelog.d/8943.misc new file mode 100644 index 0000000000..4ff0b94b94 --- /dev/null +++ b/changelog.d/8943.misc @@ -0,0 +1 @@ +Add type hints to push module. diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 3d2e874838..ad07ee86f6 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import abc -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict from synapse.types import RoomStreamToken @@ -36,12 +36,21 @@ class Pusher(metaclass=abc.ABCMeta): # This is the highest stream ordering we know it's safe to process. # When new events arrive, we'll be given a window of new events: we # should honour this rather than just looking for anything higher - # because of potential out-of-order event serialisation. This starts - # off as None though as we don't know any better. - self.max_stream_ordering = None # type: Optional[int] + # because of potential out-of-order event serialisation. + self.max_stream_ordering = self.store.get_room_max_stream_ordering() - @abc.abstractmethod def on_new_notifications(self, max_token: RoomStreamToken) -> None: + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + self._start_processing() + + @abc.abstractmethod + def _start_processing(self): + """Start processing push notifications.""" raise NotImplementedError() @abc.abstractmethod diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 64a35c1994..11a97b8df4 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -22,7 +22,6 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher from synapse.push.mailer import Mailer -from synapse.types import RoomStreamToken if TYPE_CHECKING: from synapse.app.homeserver import HomeServer @@ -93,20 +92,6 @@ class EmailPusher(Pusher): pass self.timed_call = None - def on_new_notifications(self, max_token: RoomStreamToken) -> None: - # We just use the minimum stream ordering and ignore the vector clock - # component. This is safe to do as long as we *always* ignore the vector - # clock components. - max_stream_ordering = max_token.stream - - if self.max_stream_ordering: - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering - ) - else: - self.max_stream_ordering = max_stream_ordering - self._start_processing() - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the @@ -172,7 +157,6 @@ class EmailPusher(Pusher): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - assert self.max_stream_ordering is not None unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email( self.user_id, start, self.max_stream_ordering ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 5408aa1295..e8b25bcd2a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -26,7 +26,6 @@ from synapse.events import EventBase from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfigException -from synapse.types import RoomStreamToken from . import push_rule_evaluator, push_tools @@ -122,17 +121,6 @@ class HttpPusher(Pusher): if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_token: RoomStreamToken) -> None: - # We just use the minimum stream ordering and ignore the vector clock - # component. This is safe to do as long as we *always* ignore the vector - # clock components. - max_stream_ordering = max_token.stream - - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering or 0 - ) - self._start_processing() - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: # Note that the min here shouldn't be relied upon to be accurate. @@ -192,10 +180,7 @@ class HttpPusher(Pusher): Never call this directly: use _process which will only allow this to run once per pusher. """ - - fn = self.store.get_unread_push_actions_for_user_in_range_for_http - assert self.max_stream_ordering is not None - unprocessed = await fn( + unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 9fcc0b8a64..9c12d81cfb 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -129,9 +129,8 @@ class PusherPool: ) # create the pusher setting last_stream_ordering to the current maximum - # stream ordering in event_push_actions, so it will process - # pushes from this point onwards. - last_stream_ordering = await self.store.get_latest_push_action_stream_ordering() + # stream ordering, so it will process pushes from this point onwards. + last_stream_ordering = self.store.get_room_max_stream_ordering() await self.store.add_pusher( user_id=user_id, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2e56dfaf31..e5c03cc609 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -894,16 +894,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore): pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) return push_actions - async def get_latest_push_action_stream_ordering(self): - def f(txn): - txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") - return txn.fetchone() - - result = await self.db_pool.runInteraction( - "get_latest_push_action_stream_ordering", f - ) - return result[0] or 0 - def _remove_old_push_actions_before_txn( self, txn, room_id, user_id, stream_ordering ): -- cgit 1.4.1