summary refs log tree commit diff
path: root/synapse/handlers/initial_sync.py
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2020-05-28 15:12:35 +0100
committerGitHub <noreply@github.com>2020-05-28 15:12:35 +0100
commit0aa9449cd412a19550a74d8d6d4b1714746ebba1 (patch)
treee5915221b57a9dc9c53aa293a373cd8fc46412c6 /synapse/handlers/initial_sync.py
parentFixes an attribute error when using the default display name during registrat... (diff)
parentMove sql schema delta files to their new location (diff)
downloadsynapse-0aa9449cd412a19550a74d8d6d4b1714746ebba1.tar.xz
Merge pull request #39 from matrix-org/dinsic-release-v1.12.x
Merge Synapse release v1.12.0 into 'dinsic'

Accompanying Sytest PR: https://github.com/matrix-org/sytest/issues/843
Diffstat (limited to 'synapse/handlers/initial_sync.py')
-rw-r--r--synapse/handlers/initial_sync.py276
1 files changed, 120 insertions, 156 deletions
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py

index aaee5db0b7..b116500c7d 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py
@@ -18,15 +18,15 @@ import logging from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.errors import SynapseError from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.streams.config import PaginationConfig from synapse.types import StreamToken, UserID from synapse.util import unwrapFirstError from synapse.util.async_helpers import concurrently_execute -from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.util.caches.response_cache import ResponseCache from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -41,11 +41,18 @@ class InitialSyncHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() - self.snapshot_cache = SnapshotCache() + self.snapshot_cache = ResponseCache(hs, "initial_sync_cache") self._event_serializer = hs.get_event_client_serializer() - - def snapshot_all_rooms(self, user_id=None, pagin_config=None, - as_client_event=True, include_archived=False): + self.storage = hs.get_storage() + self.state_store = self.storage.state + + def snapshot_all_rooms( + self, + user_id=None, + pagin_config=None, + as_client_event=True, + include_archived=False, + ): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -72,24 +79,29 @@ class InitialSyncHandler(BaseHandler): as_client_event, include_archived, ) - now_ms = self.clock.time_msec() - result = self.snapshot_cache.get(now_ms, key) - if result is not None: - return result - return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( - user_id, pagin_config, as_client_event, include_archived - )) + return self.snapshot_cache.wrap( + key, + self._snapshot_all_rooms, + user_id, + pagin_config, + as_client_event, + include_archived, + ) - @defer.inlineCallbacks - def _snapshot_all_rooms(self, user_id=None, pagin_config=None, - as_client_event=True, include_archived=False): + async def _snapshot_all_rooms( + self, + user_id=None, + pagin_config=None, + as_client_event=True, + include_archived=False, + ): memberships = [Membership.INVITE, Membership.JOIN] if include_archived: memberships.append(Membership.LEAVE) - room_list = yield self.store.get_rooms_for_user_where_membership_is( + room_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, membership_list=memberships ) @@ -97,39 +109,37 @@ class InitialSyncHandler(BaseHandler): rooms_ret = [] - now_token = yield self.hs.get_event_sources().get_current_token() + now_token = await self.hs.get_event_sources().get_current_token() presence_stream = self.hs.get_event_sources().sources["presence"] pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( + presence, _ = await presence_stream.get_pagination_rows( user, pagination_config.get_source_config("presence"), None ) receipt_stream = self.hs.get_event_sources().sources["receipt"] - receipt, _ = yield receipt_stream.get_pagination_rows( + receipt, _ = await receipt_stream.get_pagination_rows( user, pagination_config.get_source_config("receipt"), None ) - tags_by_room = yield self.store.get_tags_for_user(user_id) + tags_by_room = await self.store.get_tags_for_user(user_id) - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user(user_id) + account_data, account_data_by_room = await self.store.get_account_data_for_user( + user_id ) - public_room_ids = yield self.store.get_public_room_ids() + public_room_ids = await self.store.get_public_room_ids() limit = pagin_config.limit if limit is None: limit = 10 - @defer.inlineCallbacks - def handle_room(event): + async def handle_room(event): d = { "room_id": event.room_id, "membership": event.membership, "visibility": ( - "public" if event.room_id in public_room_ids - else "private" + "public" if event.room_id in public_room_ids else "private" ), } @@ -137,9 +147,9 @@ class InitialSyncHandler(BaseHandler): time_now = self.clock.time_msec() d["inviter"] = event.sender - invite_event = yield self.store.get_event(event.event_id) - d["invite"] = yield self._event_serializer.serialize_event( - invite_event, time_now, as_client_event, + invite_event = await self.store.get_event(event.event_id) + d["invite"] = await self._event_serializer.serialize_event( + invite_event, time_now, as_client_event ) rooms_ret.append(d) @@ -151,20 +161,18 @@ class InitialSyncHandler(BaseHandler): if event.membership == Membership.JOIN: room_end_token = now_token.room_key deferred_room_state = run_in_background( - self.state_handler.get_current_state, - event.room_id, + self.state_handler.get_current_state, event.room_id ) elif event.membership == Membership.LEAVE: room_end_token = "s%d" % (event.stream_ordering,) deferred_room_state = run_in_background( - self.store.get_state_for_events, - [event.event_id], + self.state_store.get_state_for_events, [event.event_id] ) deferred_room_state.addCallback( lambda states: states[event.event_id] ) - (messages, token), current_state = yield make_deferred_yieldable( + (messages, token), current_state = await make_deferred_yieldable( defer.gatherResults( [ run_in_background( @@ -178,8 +186,8 @@ class InitialSyncHandler(BaseHandler): ) ).addErrback(unwrapFirstError) - messages = yield filter_events_for_client( - self.store, user_id, messages + messages = await filter_events_for_client( + self.storage, user_id, messages ) start_token = now_token.copy_and_replace("room_key", token) @@ -188,48 +196,42 @@ class InitialSyncHandler(BaseHandler): d["messages"] = { "chunk": ( - yield self._event_serializer.serialize_events( - messages, time_now=time_now, - as_client_event=as_client_event, + await self._event_serializer.serialize_events( + messages, time_now=time_now, as_client_event=as_client_event ) ), "start": start_token.to_string(), "end": end_token.to_string(), } - d["state"] = yield self._event_serializer.serialize_events( + d["state"] = await self._event_serializer.serialize_events( current_state.values(), time_now=time_now, - as_client_event=as_client_event + as_client_event=as_client_event, ) account_data_events = [] tags = tags_by_room.get(event.room_id) if tags: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) + account_data_events.append( + {"type": "m.tag", "content": {"tags": tags}} + ) account_data = account_data_by_room.get(event.room_id, {}) for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) + account_data_events.append( + {"type": account_data_type, "content": content} + ) d["account_data"] = account_data_events except Exception: logger.exception("Failed to get snapshot") - yield concurrently_execute(handle_room, room_list, 10) + await concurrently_execute(handle_room, room_list, 10) account_data_events = [] for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) + account_data_events.append({"type": account_data_type, "content": content}) now = self.clock.time_msec() @@ -247,10 +249,9 @@ class InitialSyncHandler(BaseHandler): "end": now_token.to_string(), } - defer.returnValue(ret) + return ret - @defer.inlineCallbacks - def room_initial_sync(self, requester, room_id, pagin_config=None): + async def room_initial_sync(self, requester, room_id, pagin_config=None): """Capture the a snapshot of a room. If user is currently a member of the room this will be what is currently in the room. If the user left the room this will be what was in the room when they left. @@ -267,51 +268,46 @@ class InitialSyncHandler(BaseHandler): A JSON serialisable dict with the snapshot of the room. """ - blocked = yield self.store.is_room_blocked(room_id) + blocked = await self.store.is_room_blocked(room_id) if blocked: raise SynapseError(403, "This room has been blocked on this server") user_id = requester.user.to_string() - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id, + ( + membership, + member_event_id, + ) = await self.auth.check_user_in_room_or_world_readable( + room_id, user_id, allow_departed_users=True, ) is_peeking = member_event_id is None if membership == Membership.JOIN: - result = yield self._room_initial_sync_joined( + result = await self._room_initial_sync_joined( user_id, room_id, pagin_config, membership, is_peeking ) elif membership == Membership.LEAVE: - result = yield self._room_initial_sync_parted( + result = await self._room_initial_sync_parted( user_id, room_id, pagin_config, membership, member_event_id, is_peeking ) account_data_events = [] - tags = yield self.store.get_tags_for_room(user_id, room_id) + tags = await self.store.get_tags_for_room(user_id, room_id) if tags: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) + account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) - account_data = yield self.store.get_account_data_for_room(user_id, room_id) + account_data = await self.store.get_account_data_for_room(user_id, room_id) for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) + account_data_events.append({"type": account_data_type, "content": content}) result["account_data"] = account_data_events - defer.returnValue(result) + return result - @defer.inlineCallbacks - def _room_initial_sync_parted(self, user_id, room_id, pagin_config, - membership, member_event_id, is_peeking): - room_state = yield self.store.get_state_for_events( - [member_event_id], - ) + async def _room_initial_sync_parted( + self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking + ): + room_state = await self.state_store.get_state_for_events([member_event_id]) room_state = room_state[member_event_id] @@ -319,18 +315,14 @@ class InitialSyncHandler(BaseHandler): if limit is None: limit = 10 - stream_token = yield self.store.get_stream_token_for_event( - member_event_id - ) + stream_token = await self.store.get_stream_token_for_event(member_event_id) - messages, token = yield self.store.get_recent_events_for_room( - room_id, - limit=limit, - end_token=stream_token + messages, token = await self.store.get_recent_events_for_room( + room_id, limit=limit, end_token=stream_token ) - messages = yield filter_events_for_client( - self.store, user_id, messages, is_peeking=is_peeking + messages = await filter_events_for_client( + self.storage, user_id, messages, is_peeking=is_peeking ) start_token = StreamToken.START.copy_and_replace("room_key", token) @@ -338,74 +330,71 @@ class InitialSyncHandler(BaseHandler): time_now = self.clock.time_msec() - defer.returnValue({ + return { "membership": membership, "room_id": room_id, "messages": { - "chunk": (yield self._event_serializer.serialize_events( - messages, time_now, - )), + "chunk": ( + await self._event_serializer.serialize_events(messages, time_now) + ), "start": start_token.to_string(), "end": end_token.to_string(), }, - "state": (yield self._event_serializer.serialize_events( - room_state.values(), time_now, - )), + "state": ( + await self._event_serializer.serialize_events( + room_state.values(), time_now + ) + ), "presence": [], "receipts": [], - }) + } - @defer.inlineCallbacks - def _room_initial_sync_joined(self, user_id, room_id, pagin_config, - membership, is_peeking): - current_state = yield self.state.get_current_state( - room_id=room_id, - ) + async def _room_initial_sync_joined( + self, user_id, room_id, pagin_config, membership, is_peeking + ): + current_state = await self.state.get_current_state(room_id=room_id) # TODO: These concurrently time_now = self.clock.time_msec() - state = yield self._event_serializer.serialize_events( - current_state.values(), time_now, + state = await self._event_serializer.serialize_events( + current_state.values(), time_now ) - now_token = yield self.hs.get_event_sources().get_current_token() + now_token = await self.hs.get_event_sources().get_current_token() limit = pagin_config.limit if pagin_config else None if limit is None: limit = 10 room_members = [ - m for m in current_state.values() + m + for m in current_state.values() if m.type == EventTypes.Member and m.content["membership"] == Membership.JOIN ] presence_handler = self.hs.get_presence_handler() - @defer.inlineCallbacks - def get_presence(): + async def get_presence(): # If presence is disabled, return an empty list if not self.hs.config.use_presence: - defer.returnValue([]) + return [] - states = yield presence_handler.get_states( - [m.user_id for m in room_members], - as_event=True, + states = await presence_handler.get_states( + [m.user_id for m in room_members], as_event=True ) - defer.returnValue(states) + return states - @defer.inlineCallbacks - def get_receipts(): - receipts = yield self.store.get_linearized_receipts_for_room( - room_id, - to_key=now_token.receipt_key, + async def get_receipts(): + receipts = await self.store.get_linearized_receipts_for_room( + room_id, to_key=now_token.receipt_key ) if not receipts: receipts = [] - defer.returnValue(receipts) + return receipts - presence, receipts, (messages, token) = yield make_deferred_yieldable( + presence, receipts, (messages, token) = await make_deferred_yieldable( defer.gatherResults( [ run_in_background(get_presence), @@ -415,14 +404,14 @@ class InitialSyncHandler(BaseHandler): room_id, limit=limit, end_token=now_token.room_key, - ) + ), ], consumeErrors=True, - ).addErrback(unwrapFirstError), + ).addErrback(unwrapFirstError) ) - messages = yield filter_events_for_client( - self.store, user_id, messages, is_peeking=is_peeking, + messages = await filter_events_for_client( + self.storage, user_id, messages, is_peeking=is_peeking ) start_token = now_token.copy_and_replace("room_key", token) @@ -433,9 +422,9 @@ class InitialSyncHandler(BaseHandler): ret = { "room_id": room_id, "messages": { - "chunk": (yield self._event_serializer.serialize_events( - messages, time_now, - )), + "chunk": ( + await self._event_serializer.serialize_events(messages, time_now) + ), "start": start_token.to_string(), "end": end_token.to_string(), }, @@ -446,29 +435,4 @@ class InitialSyncHandler(BaseHandler): if not is_peeking: ret["membership"] = membership - defer.returnValue(ret) - - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) + return ret