summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-08-11 22:03:14 +0100
committerErik Johnston <erik@matrix.org>2020-08-11 22:03:14 +0100
commitfdb46b5442c63212a52e2296491a23f1935f9929 (patch)
treec46d0940415f96e3c0383c35f1d39b7462d5c20c /synapse/notifier.py
parentAdd comment explaining cast (diff)
parentAuto set logging filter (#8051) (diff)
downloadsynapse-fdb46b5442c63212a52e2296491a23f1935f9929.tar.xz
Merge remote-tracking branch 'origin/develop' into erikj/type_server
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py131
1 files changed, 83 insertions, 48 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 22ab4a9da5..694efe7116 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -15,7 +15,17 @@
 
 import logging
 from collections import namedtuple
-from typing import Callable, Iterable, List, TypeVar
+from typing import (
+    Awaitable,
+    Callable,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    TypeVar,
+)
 
 from prometheus_client import Counter
 
@@ -24,12 +34,14 @@ from twisted.internet import defer
 import synapse.server
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
+from synapse.events import EventBase
 from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import PreserveLoggingContext
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import StreamToken
+from synapse.streams.config import PaginationConfig
+from synapse.types import Collection, StreamToken, UserID
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -77,7 +89,13 @@ class _NotifierUserStream(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user_id, rooms, current_token, time_now_ms):
+    def __init__(
+        self,
+        user_id: str,
+        rooms: Collection[str],
+        current_token: StreamToken,
+        time_now_ms: int,
+    ):
         self.user_id = user_id
         self.rooms = set(rooms)
         self.current_token = current_token
@@ -93,13 +111,13 @@ class _NotifierUserStream(object):
         with PreserveLoggingContext():
             self.notify_deferred = ObservableDeferred(defer.Deferred())
 
-    def notify(self, stream_key, stream_id, time_now_ms):
+    def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
         """Notify any listeners for this user of a new event from an
         event source.
         Args:
-            stream_key(str): The stream the event came from.
-            stream_id(str): The new id for the stream the event came from.
-            time_now_ms(int): The current time in milliseconds.
+            stream_key: The stream the event came from.
+            stream_id: The new id for the stream the event came from.
+            time_now_ms: The current time in milliseconds.
         """
         self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
         self.last_notified_token = self.current_token
@@ -112,7 +130,7 @@ class _NotifierUserStream(object):
             self.notify_deferred = ObservableDeferred(defer.Deferred())
             noify_deferred.callback(self.current_token)
 
-    def remove(self, notifier):
+    def remove(self, notifier: "Notifier"):
         """ Remove this listener from all the indexes in the Notifier
         it knows about.
         """
@@ -123,10 +141,10 @@ class _NotifierUserStream(object):
 
         notifier.user_to_user_stream.pop(self.user_id)
 
-    def count_listeners(self):
+    def count_listeners(self) -> int:
         return len(self.notify_deferred.observers())
 
-    def new_listener(self, token):
+    def new_listener(self, token: StreamToken) -> _NotificationListener:
         """Returns a deferred that is resolved when there is a new token
         greater than the given token.
 
@@ -159,14 +177,16 @@ class Notifier(object):
     UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
 
     def __init__(self, hs: "synapse.server.HomeServer"):
-        self.user_to_user_stream = {}
-        self.room_to_user_streams = {}
+        self.user_to_user_stream = {}  # type: Dict[str, _NotifierUserStream]
+        self.room_to_user_streams = {}  # type: Dict[str, Set[_NotifierUserStream]]
 
         self.hs = hs
         self.storage = hs.get_storage()
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
-        self.pending_new_room_events = []
+        self.pending_new_room_events = (
+            []
+        )  # type: List[Tuple[int, EventBase, Collection[str]]]
 
         # Called when there are new things to stream over replication
         self.replication_callbacks = []  # type: List[Callable[[], None]]
@@ -178,10 +198,9 @@ class Notifier(object):
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
 
+        self.federation_sender = None
         if hs.should_send_federation():
             self.federation_sender = hs.get_federation_sender()
-        else:
-            self.federation_sender = None
 
         self.state_handler = hs.get_state_handler()
 
@@ -193,12 +212,12 @@ class Notifier(object):
         # when rendering the metrics page, which is likely once per minute at
         # most when scraping it.
         def count_listeners():
-            all_user_streams = set()
+            all_user_streams = set()  # type: Set[_NotifierUserStream]
 
-            for x in list(self.room_to_user_streams.values()):
-                all_user_streams |= x
-            for x in list(self.user_to_user_stream.values()):
-                all_user_streams.add(x)
+            for streams in list(self.room_to_user_streams.values()):
+                all_user_streams |= streams
+            for stream in list(self.user_to_user_stream.values()):
+                all_user_streams.add(stream)
 
             return sum(stream.count_listeners() for stream in all_user_streams)
 
@@ -223,7 +242,11 @@ class Notifier(object):
         self.replication_callbacks.append(cb)
 
     def on_new_room_event(
-        self, event, room_stream_id, max_room_stream_id, extra_users=[]
+        self,
+        event: EventBase,
+        room_stream_id: int,
+        max_room_stream_id: int,
+        extra_users: Collection[str] = [],
     ):
         """ Used by handlers to inform the notifier something has happened
         in the room, room event wise.
@@ -241,11 +264,11 @@ class Notifier(object):
 
         self.notify_replication()
 
-    def _notify_pending_new_room_events(self, max_room_stream_id):
+    def _notify_pending_new_room_events(self, max_room_stream_id: int):
         """Notify for the room events that were queued waiting for a previous
         event to be persisted.
         Args:
-            max_room_stream_id(int): The highest stream_id below which all
+            max_room_stream_id: The highest stream_id below which all
                 events have been persisted.
         """
         pending = self.pending_new_room_events
@@ -258,7 +281,9 @@ class Notifier(object):
             else:
                 self._on_new_room_event(event, room_stream_id, extra_users)
 
-    def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
+    def _on_new_room_event(
+        self, event: EventBase, room_stream_id: int, extra_users: Collection[str] = []
+    ):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         run_as_background_process(
@@ -275,13 +300,19 @@ class Notifier(object):
             "room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
         )
 
-    async def _notify_app_services(self, room_stream_id):
+    async def _notify_app_services(self, room_stream_id: int):
         try:
             await self.appservice_handler.notify_interested_services(room_stream_id)
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
+    def on_new_event(
+        self,
+        stream_key: str,
+        new_token: int,
+        users: Collection[str] = [],
+        rooms: Collection[str] = [],
+    ):
         """ Used to inform listeners that something has happened event wise.
 
         Will wake up all listeners for the given users and rooms.
@@ -307,14 +338,19 @@ class Notifier(object):
 
                 self.notify_replication()
 
-    def on_new_replication_data(self):
+    def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happend
         without waking up any of the normal user event streams"""
         self.notify_replication()
 
     async def wait_for_events(
-        self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START
-    ):
+        self,
+        user_id: str,
+        timeout: int,
+        callback: Callable[[StreamToken, StreamToken], Awaitable[T]],
+        room_ids=None,
+        from_token=StreamToken.START,
+    ) -> T:
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
@@ -377,19 +413,16 @@ class Notifier(object):
 
     async def get_events_for(
         self,
-        user,
-        pagination_config,
-        timeout,
-        only_keys=None,
-        is_guest=False,
-        explicit_room_id=None,
-    ):
+        user: UserID,
+        pagination_config: PaginationConfig,
+        timeout: int,
+        is_guest: bool = False,
+        explicit_room_id: str = None,
+    ) -> EventStreamResult:
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
 
-        If `only_keys` is not None, events from keys will be sent down.
-
         If explicit_room_id is not set, the user's joined rooms will be polled
         for events.
         If explicit_room_id is set, that room will be polled for events only if
@@ -404,11 +437,13 @@ class Notifier(object):
         room_ids, is_joined = await self._get_room_ids(user, explicit_room_id)
         is_peeking = not is_joined
 
-        async def check_for_updates(before_token, after_token):
+        async def check_for_updates(
+            before_token: StreamToken, after_token: StreamToken
+        ) -> EventStreamResult:
             if not after_token.is_after(before_token):
                 return EventStreamResult([], (from_token, from_token))
 
-            events = []
+            events = []  # type: List[EventBase]
             end_token = from_token
 
             for name, source in self.event_sources.sources.items():
@@ -417,8 +452,6 @@ class Notifier(object):
                 after_id = getattr(after_token, keyname)
                 if before_id == after_id:
                     continue
-                if only_keys and name not in only_keys:
-                    continue
 
                 new_events, new_key = await source.get_new_events(
                     user=user,
@@ -476,7 +509,9 @@ class Notifier(object):
 
         return result
 
-    async def _get_room_ids(self, user, explicit_room_id):
+    async def _get_room_ids(
+        self, user: UserID, explicit_room_id: Optional[str]
+    ) -> Tuple[Collection[str], bool]:
         joined_room_ids = await self.store.get_rooms_for_user(user.to_string())
         if explicit_room_id:
             if explicit_room_id in joined_room_ids:
@@ -486,7 +521,7 @@ class Notifier(object):
             raise AuthError(403, "Non-joined access not allowed")
         return joined_room_ids, True
 
-    async def _is_world_readable(self, room_id):
+    async def _is_world_readable(self, room_id: str) -> bool:
         state = await self.state_handler.get_current_state(
             room_id, EventTypes.RoomHistoryVisibility, ""
         )
@@ -496,7 +531,7 @@ class Notifier(object):
             return False
 
     @log_function
-    def remove_expired_streams(self):
+    def remove_expired_streams(self) -> None:
         time_now_ms = self.clock.time_msec()
         expired_streams = []
         expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
@@ -510,21 +545,21 @@ class Notifier(object):
             expired_stream.remove(self)
 
     @log_function
-    def _register_with_keys(self, user_stream):
+    def _register_with_keys(self, user_stream: _NotifierUserStream):
         self.user_to_user_stream[user_stream.user_id] = user_stream
 
         for room in user_stream.rooms:
             s = self.room_to_user_streams.setdefault(room, set())
             s.add(user_stream)
 
-    def _user_joined_room(self, user_id, room_id):
+    def _user_joined_room(self, user_id: str, room_id: str):
         new_user_stream = self.user_to_user_stream.get(user_id)
         if new_user_stream is not None:
             room_streams = self.room_to_user_streams.setdefault(room_id, set())
             room_streams.add(new_user_stream)
             new_user_stream.rooms.add(room_id)
 
-    def notify_replication(self):
+    def notify_replication(self) -> None:
         """Notify the any replication listeners that there's a new event"""
         for cb in self.replication_callbacks:
             cb()