diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index e9e7a78546..96273e2f81 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import random
-from typing import TYPE_CHECKING, Any, List, Tuple
+from typing import TYPE_CHECKING, Collection, List, Optional, Tuple
from synapse.replication.http.account_data import (
ReplicationAddTagRestServlet,
@@ -21,6 +21,7 @@ from synapse.replication.http.account_data import (
ReplicationRoomAccountDataRestServlet,
ReplicationUserAccountDataRestServlet,
)
+from synapse.streams import EventSource
from synapse.types import JsonDict, UserID
if TYPE_CHECKING:
@@ -163,7 +164,7 @@ class AccountDataHandler:
return response["max_stream_id"]
-class AccountDataEventSource:
+class AccountDataEventSource(EventSource[int, JsonDict]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
@@ -171,7 +172,13 @@ class AccountDataEventSource:
return self.store.get_max_account_data_stream_id()
async def get_new_events(
- self, user: UserID, from_key: int, **kwargs: Any
+ self,
+ user: UserID,
+ from_key: int,
+ limit: Optional[int],
+ room_ids: Collection[str],
+ is_guest: bool,
+ explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonDict], int]:
user_id = user.to_string()
last_stream_id = from_key
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 8bde9ed66f..b7213b67a5 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -254,7 +254,7 @@ class ApplicationServicesHandler:
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
- typing_source = self.event_sources.sources["typing"]
+ typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
@@ -269,7 +269,7 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
- receipts_source = self.event_sources.sources["receipt"]
+ receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
@@ -279,7 +279,7 @@ class ApplicationServicesHandler:
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
events: List[JsonDict] = []
- presence_source = self.event_sources.sources["presence"]
+ presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index c942086e74..9ad39a65d8 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -125,7 +125,7 @@ class InitialSyncHandler(BaseHandler):
now_token = self.hs.get_event_sources().get_current_token()
- presence_stream = self.hs.get_event_sources().sources["presence"]
+ presence_stream = self.hs.get_event_sources().sources.presence
presence, _ = await presence_stream.get_new_events(
user, from_key=None, include_offline=False
)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 841c8815b0..983c837c66 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -65,6 +65,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
+from synapse.streams import EventSource
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
@@ -1500,7 +1501,7 @@ def format_user_presence_state(
return content
-class PresenceEventSource:
+class PresenceEventSource(EventSource[int, UserPresenceState]):
def __init__(self, hs: "HomeServer"):
# We can't call get_presence_handler here because there's a cycle:
#
@@ -1519,10 +1520,11 @@ class PresenceEventSource:
self,
user: UserID,
from_key: Optional[int],
+ limit: Optional[int] = None,
room_ids: Optional[List[str]] = None,
- include_offline: bool = True,
+ is_guest: bool = False,
explicit_room_id: Optional[str] = None,
- **kwargs: Any,
+ include_offline: bool = True,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index c7567ac05f..5881f09ebd 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, List, Optional, Tuple
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse.api.constants import ReadReceiptEventFields
from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
+from synapse.streams import EventSource
from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
if TYPE_CHECKING:
@@ -162,7 +163,7 @@ class ReceiptsHandler(BaseHandler):
await self.federation_sender.send_read_receipt(receipt)
-class ReceiptEventSource:
+class ReceiptEventSource(EventSource[int, JsonDict]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.config = hs.config
@@ -216,7 +217,13 @@ class ReceiptEventSource:
return visible_events
async def get_new_events(
- self, from_key: int, room_ids: List[str], user: UserID, **kwargs: Any
+ self,
+ user: UserID,
+ from_key: int,
+ limit: Optional[int],
+ room_ids: Iterable[str],
+ is_guest: bool,
+ explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonDict], int]:
from_key = int(from_key)
to_key = self.get_current_key()
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index abdd506164..287ea2fd06 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -20,7 +20,16 @@ import math
import random
import string
from collections import OrderedDict
-from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Collection,
+ Dict,
+ List,
+ Optional,
+ Tuple,
+)
from synapse.api.constants import (
EventContentFields,
@@ -47,6 +56,7 @@ from synapse.events import EventBase
from synapse.events.utils import copy_power_levels_contents
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
+from synapse.streams import EventSource
from synapse.types import (
JsonDict,
MutableStateMap,
@@ -1173,7 +1183,7 @@ class RoomContextHandler:
return results
-class RoomEventSource:
+class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
@@ -1181,8 +1191,8 @@ class RoomEventSource:
self,
user: UserID,
from_key: RoomStreamToken,
- limit: int,
- room_ids: List[str],
+ limit: Optional[int],
+ room_ids: Collection[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[EventBase], RoomStreamToken]:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e93db4bdcc..2c7c6d63a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -443,7 +443,7 @@ class SyncHandler:
room_ids = sync_result_builder.joined_room_ids
- typing_source = self.event_sources.sources["typing"]
+ typing_source = self.event_sources.sources.typing
typing, typing_key = await typing_source.get_new_events(
user=sync_config.user,
from_key=typing_key,
@@ -465,7 +465,7 @@ class SyncHandler:
receipt_key = since_token.receipt_key if since_token else 0
- receipt_source = self.event_sources.sources["receipt"]
+ receipt_source = self.event_sources.sources.receipt
receipts, receipt_key = await receipt_source.get_new_events(
user=sync_config.user,
from_key=receipt_key,
@@ -1415,7 +1415,7 @@ class SyncHandler:
sync_config = sync_result_builder.sync_config
user = sync_result_builder.sync_config.user
- presence_source = self.event_sources.sources["presence"]
+ presence_source = self.event_sources.sources.presence
since_token = sync_result_builder.since_token
presence_key = None
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 4492c8567b..9326330c90 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -14,7 +14,7 @@
import logging
import random
from collections import namedtuple
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
@@ -23,6 +23,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.replication.tcp.streams import TypingStream
+from synapse.streams import EventSource
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
@@ -439,7 +440,7 @@ class TypingWriterHandler(FollowerTypingHandler):
raise Exception("Typing writer instance got typing info over replication")
-class TypingNotificationEventSource:
+class TypingNotificationEventSource(EventSource[int, JsonDict]):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.clock = hs.get_clock()
@@ -485,7 +486,13 @@ class TypingNotificationEventSource:
return (events, handler._latest_room_serial)
async def get_new_events(
- self, from_key: int, room_ids: Iterable[str], **kwargs: Any
+ self,
+ user: UserID,
+ from_key: int,
+ limit: Optional[int],
+ room_ids: Iterable[str],
+ is_guest: bool,
+ explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonDict], int]:
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
|