From 0f545e6b9670fd780579445ff68dba95a8e08545 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Sep 2020 15:00:17 +0100 Subject: Clean up types for PaginationConfig (#8250) This removes `SourcePaginationConfig` and `get_pagination_rows`. The reasoning behind this is that these generic classes/functions erased the types of the IDs it used (i.e. instead of passing around `StreamToken` it'd pass in e.g. `token.room_key`, which don't have uniform types). --- synapse/handlers/initial_sync.py | 11 +++++------ synapse/handlers/pagination.py | 42 +++++++++++++++++++++------------------- synapse/handlers/presence.py | 3 --- synapse/handlers/receipts.py | 15 -------------- 4 files changed, 27 insertions(+), 44 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index d5ddc583ad..ddb8f0712b 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -116,14 +116,13 @@ class InitialSyncHandler(BaseHandler): now_token = self.hs.get_event_sources().get_current_token() presence_stream = self.hs.get_event_sources().sources["presence"] - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = await presence_stream.get_pagination_rows( - user, pagination_config.get_source_config("presence"), None + presence, _ = await presence_stream.get_new_events( + user, from_key=None, include_offline=False ) - receipt_stream = self.hs.get_event_sources().sources["receipt"] - receipt, _ = await receipt_stream.get_pagination_rows( - user, pagination_config.get_source_config("receipt"), None + joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN] + receipt = await self.store.get_linearized_receipts_for_rooms( + joined_rooms, to_key=int(now_token.receipt_key), ) tags_by_room = await self.store.get_tags_for_user(user_id) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 34ed0e2921..195a1fd77e 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -335,20 +335,16 @@ class PaginationHandler: user_id = requester.user.to_string() if pagin_config.from_token: - room_token = pagin_config.from_token.room_key + from_token = pagin_config.from_token else: - pagin_config.from_token = ( - self.hs.get_event_sources().get_current_token_for_pagination() - ) - room_token = pagin_config.from_token.room_key - - room_token = RoomStreamToken.parse(room_token) + from_token = self.hs.get_event_sources().get_current_token_for_pagination() - pagin_config.from_token = pagin_config.from_token.copy_and_replace( - "room_key", str(room_token) - ) + if pagin_config.limit is None: + # This shouldn't happen as we've set a default limit before this + # gets called. + raise Exception("limit not set") - source_config = pagin_config.get_source_config("room") + room_token = RoomStreamToken.parse(from_token.room_key) with await self.pagination_lock.read(room_id): ( @@ -358,7 +354,7 @@ class PaginationHandler: room_id, user_id, allow_departed_users=True ) - if source_config.direction == "b": + if pagin_config.direction == "b": # if we're going backwards, we might need to backfill. This # requires that we have a topo token. if room_token.topological: @@ -381,22 +377,28 @@ class PaginationHandler: member_event_id ) if RoomStreamToken.parse(leave_token).topological < max_topo: - source_config.from_key = str(leave_token) + from_token = from_token.copy_and_replace( + "room_key", leave_token + ) await self.hs.get_handlers().federation_handler.maybe_backfill( room_id, max_topo ) + to_room_key = None + if pagin_config.to_token: + to_room_key = pagin_config.to_token.room_key + events, next_key = await self.store.paginate_room_events( room_id=room_id, - from_key=source_config.from_key, - to_key=source_config.to_key, - direction=source_config.direction, - limit=source_config.limit, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, event_filter=event_filter, ) - next_token = pagin_config.from_token.copy_and_replace("room_key", next_key) + next_token = from_token.copy_and_replace("room_key", next_key) if events: if event_filter: @@ -409,7 +411,7 @@ class PaginationHandler: if not events: return { "chunk": [], - "start": pagin_config.from_token.to_string(), + "start": from_token.to_string(), "end": next_token.to_string(), } @@ -438,7 +440,7 @@ class PaginationHandler: events, time_now, as_client_event=as_client_event ) ), - "start": pagin_config.from_token.to_string(), + "start": from_token.to_string(), "end": next_token.to_string(), } diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 91a3aec1cc..1000ac95ff 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1108,9 +1108,6 @@ class PresenceEventSource: def get_current_key(self): return self.store.get_current_presence_token() - async def get_pagination_rows(self, user, pagination_config, key): - return await self.get_new_events(user, from_key=None, include_offline=False) - @cached(num_args=2, cache_context=True) async def _get_interested_in(self, user, explicit_room_id, cache_context): """Returns the set of users that the given user should see presence diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 2cc6c2eb68..bdd8e52edd 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -142,18 +142,3 @@ class ReceiptEventSource: def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() - - async def get_pagination_rows(self, user, config, key): - to_key = int(config.from_key) - - if config.to_key: - from_key = int(config.to_key) - else: - from_key = None - - room_ids = await self.store.get_rooms_for_user(user.to_string()) - events = await self.store.get_linearized_receipts_for_rooms( - room_ids, from_key=from_key, to_key=to_key - ) - - return (events, to_key) -- cgit 1.5.1 From 63c0e9e1954fc7fc10a2575c54aecc8944de60f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Sep 2020 16:48:15 +0100 Subject: Add types to StreamToken and RoomStreamToken (#8279) The intention here is to change `StreamToken.room_key` to be a `RoomStreamToken` in a future PR, but that is a big enough change without this refactoring too. --- changelog.d/8279.misc | 1 + synapse/handlers/sync.py | 5 +- synapse/storage/databases/main/devices.py | 7 +- synapse/storage/databases/main/stream.py | 21 +++-- synapse/types.py | 152 +++++++++++++++--------------- 5 files changed, 95 insertions(+), 91 deletions(-) create mode 100644 changelog.d/8279.misc (limited to 'synapse/handlers') diff --git a/changelog.d/8279.misc b/changelog.d/8279.misc new file mode 100644 index 0000000000..99f669001f --- /dev/null +++ b/changelog.d/8279.misc @@ -0,0 +1 @@ +Add type hints to `StreamToken` and `RoomStreamToken` classes. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e2ddb628ff..cc47e8b62c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1310,12 +1310,11 @@ class SyncHandler: presence_source = self.event_sources.sources["presence"] since_token = sync_result_builder.since_token + presence_key = None + include_offline = False if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key include_offline = True - else: - presence_key = None - include_offline = False presence, presence_key = await presence_source.get_new_events( user=user, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index add4e3ea0e..306fc6947c 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -481,7 +481,7 @@ class DeviceWorkerStore(SQLBaseStore): } async def get_users_whose_devices_changed( - self, from_key: str, user_ids: Iterable[str] + self, from_key: int, user_ids: Iterable[str] ) -> Set[str]: """Get set of users whose devices have changed since `from_key` that are in the given list of user_ids. @@ -493,7 +493,6 @@ class DeviceWorkerStore(SQLBaseStore): Returns: The set of user_ids whose devices have changed since `from_key` """ - from_key = int(from_key) # Get set of users who *may* have changed. Users not in the returned # list have definitely not changed. @@ -527,7 +526,7 @@ class DeviceWorkerStore(SQLBaseStore): ) async def get_users_whose_signatures_changed( - self, user_id: str, from_key: str + self, user_id: str, from_key: int ) -> Set[str]: """Get the users who have new cross-signing signatures made by `user_id` since `from_key`. @@ -539,7 +538,7 @@ class DeviceWorkerStore(SQLBaseStore): Returns: A set of user IDs with updated signatures. """ - from_key = int(from_key) + if self._user_signature_stream_cache.has_entity_changed(user_id, from_key): sql = """ SELECT DISTINCT user_ids FROM user_signature_stream diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index be6df8a6d1..08a13a8b47 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -79,8 +79,8 @@ _EventDictReturn = namedtuple( def generate_pagination_where_clause( direction: str, column_names: Tuple[str, str], - from_token: Optional[Tuple[int, int]], - to_token: Optional[Tuple[int, int]], + from_token: Optional[Tuple[Optional[int], int]], + to_token: Optional[Tuple[Optional[int], int]], engine: BaseDatabaseEngine, ) -> str: """Creates an SQL expression to bound the columns by the pagination @@ -535,13 +535,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if limit == 0: return [], end_token - end_token = RoomStreamToken.parse(end_token) + parsed_end_token = RoomStreamToken.parse(end_token) rows, token = await self.db_pool.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, room_id, - from_token=end_token, + from_token=parsed_end_token, limit=limit, ) @@ -989,8 +989,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): bounds = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), - from_token=from_token, - to_token=to_token, + from_token=from_token.as_tuple(), + to_token=to_token.as_tuple() if to_token else None, engine=self.database_engine, ) @@ -1083,16 +1083,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): and `to_key`). """ - from_key = RoomStreamToken.parse(from_key) + parsed_from_key = RoomStreamToken.parse(from_key) + parsed_to_key = None if to_key: - to_key = RoomStreamToken.parse(to_key) + parsed_to_key = RoomStreamToken.parse(to_key) rows, token = await self.db_pool.runInteraction( "paginate_room_events", self._paginate_room_events_txn, room_id, - from_key, - to_key, + parsed_from_key, + parsed_to_key, direction, limit, event_filter, diff --git a/synapse/types.py b/synapse/types.py index f7de48f148..ba45335038 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,7 @@ import re import string import sys from collections import namedtuple -from typing import Any, Dict, Mapping, MutableMapping, Tuple, Type, TypeVar +from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar import attr from signedjson.key import decode_verify_key_bytes @@ -362,22 +362,79 @@ def map_username_to_mxid_localpart(username, case_sensitive=False): return username.decode("ascii") -class StreamToken( - namedtuple( - "Token", - ( - "room_key", - "presence_key", - "typing_key", - "receipt_key", - "account_data_key", - "push_rules_key", - "to_device_key", - "device_list_key", - "groups_key", - ), +@attr.s(frozen=True, slots=True) +class RoomStreamToken: + """Tokens are positions between events. The token "s1" comes after event 1. + + s0 s1 + | | + [0] V [1] V [2] + + Tokens can either be a point in the live event stream or a cursor going + through historic events. + + When traversing the live event stream events are ordered by when they + arrived at the homeserver. + + When traversing historic events the events are ordered by their depth in + the event graph "topological_ordering" and then by when they arrived at the + homeserver "stream_ordering". + + Live tokens start with an "s" followed by the "stream_ordering" id of the + event it comes after. Historic tokens start with a "t" followed by the + "topological_ordering" id of the event it comes after, followed by "-", + followed by the "stream_ordering" id of the event it comes after. + """ + + topological = attr.ib( + type=Optional[int], + validator=attr.validators.optional(attr.validators.instance_of(int)), ) -): + stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) + + @classmethod + def parse(cls, string: str) -> "RoomStreamToken": + try: + if string[0] == "s": + return cls(topological=None, stream=int(string[1:])) + if string[0] == "t": + parts = string[1:].split("-", 1) + return cls(topological=int(parts[0]), stream=int(parts[1])) + except Exception: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + @classmethod + def parse_stream_token(cls, string: str) -> "RoomStreamToken": + try: + if string[0] == "s": + return cls(topological=None, stream=int(string[1:])) + except Exception: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + def as_tuple(self) -> Tuple[Optional[int], int]: + return (self.topological, self.stream) + + def __str__(self) -> str: + if self.topological is not None: + return "t%d-%d" % (self.topological, self.stream) + else: + return "s%d" % (self.stream,) + + +@attr.s(slots=True, frozen=True) +class StreamToken: + room_key = attr.ib(type=str) + presence_key = attr.ib(type=int) + typing_key = attr.ib(type=int) + receipt_key = attr.ib(type=int) + account_data_key = attr.ib(type=int) + push_rules_key = attr.ib(type=int) + to_device_key = attr.ib(type=int) + device_list_key = attr.ib(type=int) + groups_key = attr.ib(type=int) + _SEPARATOR = "_" START = None # type: StreamToken @@ -385,15 +442,15 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - while len(keys) < len(cls._fields): + while len(keys) < len(attr.fields(cls)): # i.e. old token from before receipt_key keys.append("0") - return cls(*keys) + return cls(keys[0], *(int(k) for k in keys[1:])) except Exception: raise SynapseError(400, "Invalid Token") def to_string(self): - return self._SEPARATOR.join([str(k) for k in self]) + return self._SEPARATOR.join([str(k) for k in attr.astuple(self)]) @property def room_stream_id(self): @@ -435,63 +492,10 @@ class StreamToken( return self def copy_and_replace(self, key, new_value): - return self._replace(**{key: new_value}) - - -StreamToken.START = StreamToken(*(["s0"] + ["0"] * (len(StreamToken._fields) - 1))) - - -class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): - """Tokens are positions between events. The token "s1" comes after event 1. - - s0 s1 - | | - [0] V [1] V [2] - - Tokens can either be a point in the live event stream or a cursor going - through historic events. - - When traversing the live event stream events are ordered by when they - arrived at the homeserver. - - When traversing historic events the events are ordered by their depth in - the event graph "topological_ordering" and then by when they arrived at the - homeserver "stream_ordering". - - Live tokens start with an "s" followed by the "stream_ordering" id of the - event it comes after. Historic tokens start with a "t" followed by the - "topological_ordering" id of the event it comes after, followed by "-", - followed by the "stream_ordering" id of the event it comes after. - """ + return attr.evolve(self, **{key: new_value}) - __slots__ = [] # type: list - - @classmethod - def parse(cls, string): - try: - if string[0] == "s": - return cls(topological=None, stream=int(string[1:])) - if string[0] == "t": - parts = string[1:].split("-", 1) - return cls(topological=int(parts[0]), stream=int(parts[1])) - except Exception: - pass - raise SynapseError(400, "Invalid token %r" % (string,)) - @classmethod - def parse_stream_token(cls, string): - try: - if string[0] == "s": - return cls(topological=None, stream=int(string[1:])) - except Exception: - pass - raise SynapseError(400, "Invalid token %r" % (string,)) - - def __str__(self): - if self.topological is not None: - return "t%d-%d" % (self.topological, self.stream) - else: - return "s%d" % (self.stream,) +StreamToken.START = StreamToken.from_string("s0_0") class ThirdPartyInstanceID( -- cgit 1.5.1 From 1553adc83122ac245f523524ae1583cd556ed121 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Sep 2020 17:43:31 +0100 Subject: Fix mypy error on develop (#8282) --- changelog.d/8282.misc | 1 + synapse/handlers/pagination.py | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 changelog.d/8282.misc (limited to 'synapse/handlers') diff --git a/changelog.d/8282.misc b/changelog.d/8282.misc new file mode 100644 index 0000000000..b6896a9300 --- /dev/null +++ b/changelog.d/8282.misc @@ -0,0 +1 @@ +Clean up type hints for `PaginationConfig`. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 195a1fd77e..ec17d3d888 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -373,12 +373,15 @@ class PaginationHandler: # case "JOIN" would have been returned. assert member_event_id - leave_token = await self.store.get_topological_token_for_event( + leave_token_str = await self.store.get_topological_token_for_event( member_event_id ) - if RoomStreamToken.parse(leave_token).topological < max_topo: + leave_token = RoomStreamToken.parse(leave_token_str) + assert leave_token.topological is not None + + if leave_token.topological < max_topo: from_token = from_token.copy_and_replace( - "room_key", leave_token + "room_key", leave_token_str ) await self.hs.get_handlers().federation_handler.maybe_backfill( -- cgit 1.5.1