diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/sync.py | 189 |
1 files changed, 148 insertions, 41 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 53e1eb0508..9b5b4d2c9f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -17,6 +17,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes +from synapse.api.filtering import DEFAULT_FILTER_COLLECTION from synapse.util import unwrapFirstError from twisted.internet import defer @@ -29,7 +30,7 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", - "filter", + "filter_collection", "is_guest", ]) @@ -130,6 +131,11 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() @defer.inlineCallbacks + def get_sync_for_user(self, sync_config, since_token=None, timeout=0, + filter_collection=DEFAULT_FILTER_COLLECTION): + pass + + @defer.inlineCallbacks def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -142,8 +148,9 @@ class SyncHandler(BaseHandler): if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user(sync_config, since_token, - full_state=full_state) + result = yield self.current_sync_for_user( + sync_config, since_token, full_state=full_state, + ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): @@ -151,7 +158,7 @@ class SyncHandler(BaseHandler): result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token + from_token=since_token, ) defer.returnValue(result) @@ -205,7 +212,7 @@ class SyncHandler(BaseHandler): ) membership_list = (Membership.INVITE, Membership.JOIN) - if sync_config.filter.include_leave: + if sync_config.filter_collection.include_leave: membership_list += (Membership.LEAVE, Membership.BAN) room_list = yield self.store.get_rooms_for_user_where_membership_is( @@ -266,9 +273,17 @@ class SyncHandler(BaseHandler): deferreds, consumeErrors=True ).addErrback(unwrapFirstError) + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + defer.returnValue(SyncResult( presence=presence, - account_data=self.account_data_for_user(account_data), + account_data=account_data_for_user, joined=joined, invited=invited, archived=archived, @@ -302,14 +317,31 @@ class SyncHandler(BaseHandler): current_state = yield self.get_state_at(room_id, now_token) + current_state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + current_state.values() + ) + } + + account_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral( + ephemeral_by_room.get(room_id, []) + ) + defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, state=current_state, - ephemeral=ephemeral_by_room.get(room_id, []), - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + ephemeral=ephemeral, + account_data=account_data, unread_notifications=unread_notifications, )) @@ -365,7 +397,7 @@ class SyncHandler(BaseHandler): typing, typing_key = yield typing_source.get_new_events( user=sync_config.user, from_key=typing_key, - limit=sync_config.filter.ephemeral_limit(), + limit=sync_config.filter_collection.ephemeral_limit(), room_ids=room_ids, is_guest=sync_config.is_guest, ) @@ -388,7 +420,7 @@ class SyncHandler(BaseHandler): receipts, receipt_key = yield receipt_source.get_new_events( user=sync_config.user, from_key=receipt_key, - limit=sync_config.filter.ephemeral_limit(), + limit=sync_config.filter_collection.ephemeral_limit(), room_ids=room_ids, is_guest=sync_config.is_guest, ) @@ -419,13 +451,26 @@ class SyncHandler(BaseHandler): leave_state = yield self.store.get_state_for_event(leave_event_id) + leave_state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + leave_state.values() + ) + } + + account_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, state=leave_state, - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + account_data=account_data, )) @defer.inlineCallbacks @@ -444,7 +489,7 @@ class SyncHandler(BaseHandler): presence, presence_key = yield presence_source.get_new_events( user=sync_config.user, from_key=since_token.presence_key, - limit=sync_config.filter.presence_limit(), + limit=sync_config.filter_collection.presence_limit(), room_ids=room_ids, is_guest=sync_config.is_guest, ) @@ -473,7 +518,7 @@ class SyncHandler(BaseHandler): sync_config.user ) - timeline_limit = sync_config.filter.timeline_limit() + timeline_limit = sync_config.filter_collection.timeline_limit() room_events, _ = yield self.store.get_room_events_stream( sync_config.user.to_string(), @@ -538,6 +583,27 @@ class SyncHandler(BaseHandler): # the timeline is inherently limited if we've just joined limited = True + recents = sync_config.filter_collection.filter_room_timeline(recents) + + state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + state.values() + ) + } + + acc_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + acc_data = sync_config.filter_collection.filter_room_account_data( + acc_data + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral( + ephemeral_by_room.get(room_id, []) + ) + room_sync = JoinedSyncResult( room_id=room_id, timeline=TimelineBatch( @@ -546,10 +612,8 @@ class SyncHandler(BaseHandler): limited=limited, ), state=state, - ephemeral=ephemeral_by_room.get(room_id, []), - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + ephemeral=ephemeral, + account_data=acc_data, unread_notifications={}, ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -603,9 +667,17 @@ class SyncHandler(BaseHandler): for event in invite_events ] + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + defer.returnValue(SyncResult( presence=presence, - account_data=self.account_data_for_user(account_data), + account_data=account_data_for_user, joined=joined, invited=invited, archived=archived, @@ -621,7 +693,7 @@ class SyncHandler(BaseHandler): limited = True recents = [] filtering_factor = 2 - timeline_limit = sync_config.filter.timeline_limit() + timeline_limit = sync_config.filter_collection.timeline_limit() load_limit = max(timeline_limit * filtering_factor, 100) max_repeat = 3 # Only try a few times per room, otherwise room_key = now_token.room_key @@ -634,9 +706,9 @@ class SyncHandler(BaseHandler): from_token=since_token.room_key if since_token else None, end_token=end_key, ) - (room_key, _) = keys + room_key, _ = keys end_key = "s" + room_key.split('-')[-1] - loaded_recents = sync_config.filter.filter_room_timeline(events) + loaded_recents = sync_config.filter_collection.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( sync_config.user.to_string(), loaded_recents, @@ -684,21 +756,28 @@ class SyncHandler(BaseHandler): logger.debug("Recents %r", batch) - current_state = yield self.get_state_at(room_id, now_token) + if batch.limited: + current_state = yield self.get_state_at(room_id, now_token) - state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token - ) + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) - state = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=current_state, - ) + state = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=current_state, + ) + else: + state = { + (event.type, event.state_key): event + for event in batch.events if event.is_state() + } just_joined = yield self.check_joined_room(sync_config, state) if just_joined: state = yield self.get_state_at(room_id, now_token) + # batch.limited = True notifs = yield self.unread_notifs_for_room_id( room_id, sync_config, all_ephemeral_by_room @@ -711,14 +790,29 @@ class SyncHandler(BaseHandler): 1 for notif in notifs if _action_has_highlight(notif["actions"]) ]) + state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state(state.values()) + } + + account_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral( + ephemeral_by_room.get(room_id, []) + ) + room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, state=state, - ephemeral=ephemeral_by_room.get(room_id, []), - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + ephemeral=ephemeral, + account_data=account_data, unread_notifications=unread_notifications, ) @@ -765,13 +859,26 @@ class SyncHandler(BaseHandler): current_state=state_events_at_leave, ) + state_events_delta = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + state_events_delta.values() + ) + } + + account_data = self.account_data_for_room( + leave_event.room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + room_sync = ArchivedSyncResult( room_id=leave_event.room_id, timeline=batch, state=state_events_delta, - account_data=self.account_data_for_room( - leave_event.room_id, tags_by_room, account_data_by_room - ), + account_data=account_data, ) logger.debug("Room sync: %r", room_sync) |