diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/account_data.py | 13 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 6 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 8 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 13 | ||||
-rw-r--r-- | synapse/handlers/room.py | 18 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 6 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 13 |
8 files changed, 56 insertions, 23 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index e9e7a78546..96273e2f81 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import random -from typing import TYPE_CHECKING, Any, List, Tuple +from typing import TYPE_CHECKING, Collection, List, Optional, Tuple from synapse.replication.http.account_data import ( ReplicationAddTagRestServlet, @@ -21,6 +21,7 @@ from synapse.replication.http.account_data import ( ReplicationRoomAccountDataRestServlet, ReplicationUserAccountDataRestServlet, ) +from synapse.streams import EventSource from synapse.types import JsonDict, UserID if TYPE_CHECKING: @@ -163,7 +164,7 @@ class AccountDataHandler: return response["max_stream_id"] -class AccountDataEventSource: +class AccountDataEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() @@ -171,7 +172,13 @@ class AccountDataEventSource: return self.store.get_max_account_data_stream_id() async def get_new_events( - self, user: UserID, from_key: int, **kwargs: Any + self, + user: UserID, + from_key: int, + limit: Optional[int], + room_ids: Collection[str], + is_guest: bool, + explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() last_stream_id = from_key diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8bde9ed66f..b7213b67a5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -254,7 +254,7 @@ class ApplicationServicesHandler: async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: - typing_source = self.event_sources.sources["typing"] + typing_source = self.event_sources.sources.typing # Get the typing events from just before current typing, _ = await typing_source.get_new_events_as( service=service, @@ -269,7 +269,7 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) - receipts_source = self.event_sources.sources["receipt"] + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key ) @@ -279,7 +279,7 @@ class ApplicationServicesHandler: self, service: ApplicationService, users: Collection[Union[str, UserID]] ) -> List[JsonDict]: events: List[JsonDict] = [] - presence_source = self.event_sources.sources["presence"] + presence_source = self.event_sources.sources.presence from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c942086e74..9ad39a65d8 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -125,7 +125,7 @@ class InitialSyncHandler(BaseHandler): now_token = self.hs.get_event_sources().get_current_token() - presence_stream = self.hs.get_event_sources().sources["presence"] + presence_stream = self.hs.get_event_sources().sources.presence presence, _ = await presence_stream.get_new_events( user, from_key=None, include_offline=False ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 841c8815b0..983c837c66 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -65,6 +65,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore +from synapse.streams import EventSource from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import _CacheContext, cached @@ -1500,7 +1501,7 @@ def format_user_presence_state( return content -class PresenceEventSource: +class PresenceEventSource(EventSource[int, UserPresenceState]): def __init__(self, hs: "HomeServer"): # We can't call get_presence_handler here because there's a cycle: # @@ -1519,10 +1520,11 @@ class PresenceEventSource: self, user: UserID, from_key: Optional[int], + limit: Optional[int] = None, room_ids: Optional[List[str]] = None, - include_offline: bool = True, + is_guest: bool = False, explicit_room_id: Optional[str] = None, - **kwargs: Any, + include_offline: bool = True, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index c7567ac05f..5881f09ebd 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, List, Optional, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from synapse.api.constants import ReadReceiptEventFields from synapse.appservice import ApplicationService from synapse.handlers._base import BaseHandler +from synapse.streams import EventSource from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id if TYPE_CHECKING: @@ -162,7 +163,7 @@ class ReceiptsHandler(BaseHandler): await self.federation_sender.send_read_receipt(receipt) -class ReceiptEventSource: +class ReceiptEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.config = hs.config @@ -216,7 +217,13 @@ class ReceiptEventSource: return visible_events async def get_new_events( - self, from_key: int, room_ids: List[str], user: UserID, **kwargs: Any + self, + user: UserID, + from_key: int, + limit: Optional[int], + room_ids: Iterable[str], + is_guest: bool, + explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: from_key = int(from_key) to_key = self.get_current_key() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index abdd506164..287ea2fd06 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -20,7 +20,16 @@ import math import random import string from collections import OrderedDict -from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Collection, + Dict, + List, + Optional, + Tuple, +) from synapse.api.constants import ( EventContentFields, @@ -47,6 +56,7 @@ from synapse.events import EventBase from synapse.events.utils import copy_power_levels_contents from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter +from synapse.streams import EventSource from synapse.types import ( JsonDict, MutableStateMap, @@ -1173,7 +1183,7 @@ class RoomContextHandler: return results -class RoomEventSource: +class RoomEventSource(EventSource[RoomStreamToken, EventBase]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() @@ -1181,8 +1191,8 @@ class RoomEventSource: self, user: UserID, from_key: RoomStreamToken, - limit: int, - room_ids: List[str], + limit: Optional[int], + room_ids: Collection[str], is_guest: bool, explicit_room_id: Optional[str] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e93db4bdcc..2c7c6d63a9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -443,7 +443,7 @@ class SyncHandler: room_ids = sync_result_builder.joined_room_ids - typing_source = self.event_sources.sources["typing"] + typing_source = self.event_sources.sources.typing typing, typing_key = await typing_source.get_new_events( user=sync_config.user, from_key=typing_key, @@ -465,7 +465,7 @@ class SyncHandler: receipt_key = since_token.receipt_key if since_token else 0 - receipt_source = self.event_sources.sources["receipt"] + receipt_source = self.event_sources.sources.receipt receipts, receipt_key = await receipt_source.get_new_events( user=sync_config.user, from_key=receipt_key, @@ -1415,7 +1415,7 @@ class SyncHandler: sync_config = sync_result_builder.sync_config user = sync_result_builder.sync_config.user - presence_source = self.event_sources.sources["presence"] + presence_source = self.event_sources.sources.presence since_token = sync_result_builder.since_token presence_key = None diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 4492c8567b..9326330c90 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -14,7 +14,7 @@ import logging import random from collections import namedtuple -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError from synapse.appservice import ApplicationService @@ -23,6 +23,7 @@ from synapse.metrics.background_process_metrics import ( wrap_as_background_process, ) from synapse.replication.tcp.streams import TypingStream +from synapse.streams import EventSource from synapse.types import JsonDict, Requester, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure @@ -439,7 +440,7 @@ class TypingWriterHandler(FollowerTypingHandler): raise Exception("Typing writer instance got typing info over replication") -class TypingNotificationEventSource: +class TypingNotificationEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): self.hs = hs self.clock = hs.get_clock() @@ -485,7 +486,13 @@ class TypingNotificationEventSource: return (events, handler._latest_room_serial) async def get_new_events( - self, from_key: int, room_ids: Iterable[str], **kwargs: Any + self, + user: UserID, + from_key: int, + limit: Optional[int], + room_ids: Iterable[str], + is_guest: bool, + explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) |