diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 55 |
1 files changed, 29 insertions, 26 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index a8fd3ef886..441b3d15e2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -42,7 +42,13 @@ from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.streams.config import PaginationConfig -from synapse.types import Collection, RoomStreamToken, StreamToken, UserID +from synapse.types import ( + Collection, + PersistedEventPosition, + RoomStreamToken, + StreamToken, + UserID, +) from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -187,7 +193,7 @@ class Notifier: self.store = hs.get_datastore() self.pending_new_room_events = ( [] - ) # type: List[Tuple[int, EventBase, Collection[UserID]]] + ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -246,8 +252,8 @@ class Notifier: def on_new_room_event( self, event: EventBase, - room_stream_id: int, - max_room_stream_id: int, + event_pos: PersistedEventPosition, + max_room_stream_token: RoomStreamToken, extra_users: Collection[UserID] = [], ): """ Used by handlers to inform the notifier something has happened @@ -261,16 +267,16 @@ class Notifier: until all previous events have been persisted before notifying the client streams. """ - self.pending_new_room_events.append((room_stream_id, event, extra_users)) - self._notify_pending_new_room_events(max_room_stream_id) + self.pending_new_room_events.append((event_pos, event, extra_users)) + self._notify_pending_new_room_events(max_room_stream_token) self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_id: int): + def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken): """Notify for the room events that were queued waiting for a previous event to be persisted. Args: - max_room_stream_id: The highest stream_id below which all + max_room_stream_token: The highest stream_id below which all events have been persisted. """ pending = self.pending_new_room_events @@ -279,11 +285,9 @@ class Notifier: users = set() # type: Set[UserID] rooms = set() # type: Set[str] - for room_stream_id, event, extra_users in pending: - if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append( - (room_stream_id, event, extra_users) - ) + for event_pos, event, extra_users in pending: + if event_pos.persisted_after(max_room_stream_token): + self.pending_new_room_events.append((event_pos, event, extra_users)) else: if ( event.type == EventTypes.Member @@ -296,39 +300,38 @@ class Notifier: if users or rooms: self.on_new_event( - "room_key", - RoomStreamToken(None, max_room_stream_id), - users=users, - rooms=rooms, + "room_key", max_room_stream_token, users=users, rooms=rooms, ) - self._on_updated_room_token(max_room_stream_id) + self._on_updated_room_token(max_room_stream_token) - def _on_updated_room_token(self, max_room_stream_id: int): + def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken): """Poke services that might care that the room position has been updated. """ # poke any interested application service. run_as_background_process( - "_notify_app_services", self._notify_app_services, max_room_stream_id + "_notify_app_services", self._notify_app_services, max_room_stream_token ) run_as_background_process( - "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id + "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token ) if self.federation_sender: - self.federation_sender.notify_new_events(max_room_stream_id) + self.federation_sender.notify_new_events(max_room_stream_token.stream) - async def _notify_app_services(self, max_room_stream_id: int): + async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: - await self.appservice_handler.notify_interested_services(max_room_stream_id) + await self.appservice_handler.notify_interested_services( + max_room_stream_token.stream + ) except Exception: logger.exception("Error notifying application services of event") - async def _notify_pusher_pool(self, max_room_stream_id: int): + async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_id) + await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) except Exception: logger.exception("Error pusher pool of event") |