summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-05-24 15:02:05 +0100
committerErik Johnston <erik@matrix.org>2016-05-24 16:28:51 +0100
commit4902770e3222c80828a3d8027324687ffd631fb5 (patch)
treef6db4365b4a32a195e579c013a34bd013e7799da
parentAdd POST /sync API endpoint (diff)
parentChange short circuit path (diff)
downloadsynapse-4902770e3222c80828a3d8027324687ffd631fb5.tar.xz
Merge branch 'erikj/sync_refactor' of github.com:matrix-org/synapse into erikj/paginate_sync
-rw-r--r--synapse/handlers/sync.py1353
-rw-r--r--synapse/push/push_tools.py4
-rw-r--r--synapse/rest/client/v1/room.py5
-rw-r--r--synapse/types.py33
4 files changed, 709 insertions, 686 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 394fe8154f..4ca9ff4dbc 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext @@ -21,7 +20,6 @@ from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client - from synapse.types import SyncNextBatchToken from twisted.internet import defer @@ -196,254 +194,7 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - if batch_token is None or full_state: - return self.full_state_sync(sync_config, batch_token) - else: - return self.incremental_sync_with_gap(sync_config, batch_token) - - @defer.inlineCallbacks - def _get_room_timestamps_at_token(self, room_ids, token, sync_config, - limit): - room_to_entries = {} - - @defer.inlineCallbacks - def _get_last_ts(room_id): - entry = yield self.store.get_last_event_id_ts_for_room( - room_id, token.room_key - ) - - # TODO: Is this ever possible? - room_to_entries[room_id] = entry if entry else { - "origin_server_ts": 0, - } - - yield concurrently_execute(_get_last_ts, room_ids, 10) - - if len(room_to_entries) <= limit: - defer.returnValue({ - room_id: entry["origin_server_ts"] - for room_id, entry in room_to_entries.items() - }) - - queued_events = sorted( - room_to_entries.items(), - key=lambda e: -e[1]["origin_server_ts"] - ) - - to_return = {} - - while len(to_return) < limit and len(queued_events) > 0: - to_fetch = queued_events[:limit - len(to_return)] - event_to_q = { - e["event_id"]: (room_id, e) for room_id, e in to_fetch - if "event_id" in e - } - - # Now we fetch each event to check if its been filtered out - event_map = yield self.store.get_events(event_to_q.keys()) - - recents = sync_config.filter_collection.filter_room_timeline( - event_map.values() - ) - recents = yield filter_events_for_client( - self.store, - sync_config.user.to_string(), - recents, - ) - - to_return.update({r.room_id: r.origin_server_ts for r in recents}) - - for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): - queued_events.append(event_to_q[ev_id]) - - # FIXME: Need to refetch TS - queued_events.sort(key=lambda e: -e[1]["origin_server_ts"]) - - defer.returnValue(to_return) - - @defer.inlineCallbacks - def full_state_sync(self, sync_config, batch_token): - """Get a sync for a client which is starting without any state. - - If a 'message_since_token' is given, only timeline events which have - happened since that token will be returned. - - Returns: - A Deferred SyncResult. - """ - if batch_token: - timeline_since_token = batch_token.stream_token - else: - timeline_since_token = None - - now_token = yield self.event_sources.get_current_token() - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - - presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) - - membership_list = ( - Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN - ) - - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=membership_list - ) - - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( - sync_config.user.to_string() - ) - ) - - account_data['m.push_rules'] = yield self.push_rules_for_user( - sync_config.user - ) - - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() - ) - - ignored_users = account_data.get( - "m.ignored_user_list", {} - ).get("ignored_users", {}).keys() - - joined = [] - invited = [] - archived = [] - - user_id = sync_config.user.to_string() - - pagination_limit = 10 - room_pagination_config = { - "l": pagination_limit, - "o": 0, - "t": now_token.to_string(), - } - - room_to_last_ts = yield self._get_room_timestamps_at_token( - room_ids=[ - e.room_id for e in room_list if e.membership == Membership.JOIN - ], - token=now_token, - sync_config=sync_config, - limit=pagination_limit, - ) - - if room_to_last_ts: - sorted_list = sorted( - room_to_last_ts.items(), - key=lambda item: -item[1] - )[:pagination_limit] - - _, bottom_ts = sorted_list[-1] - room_pagination_config["ts"] = bottom_ts - - joined_rooms_list = frozenset(room_id for room_id, _f in sorted_list) - else: - room_pagination_config = {} - joined_rooms_list = frozenset() - - @defer.inlineCallbacks - def _generate_room_entry(event): - if event.membership == Membership.JOIN: - if event.room_id in joined_rooms_list: - room_result = yield self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - joined.append(room_result) - elif event.membership == Membership.INVITE: - if event.sender in ignored_users: - return - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if user_id == event.sender: - return - - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_result = yield self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - archived.append(room_result) - - yield concurrently_execute(_generate_room_entry, room_list, 10) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence(presence) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=SyncNextBatchToken( - stream_token=now_token, - pagination_config=room_pagination_config, - ), - )) - - @defer.inlineCallbacks - def full_state_sync_for_joined_room(self, room_id, sync_config, - now_token, timeline_since_token, - ephemeral_by_room, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred JoinedSyncResult. - """ - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token=timeline_since_token - ) - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, - now_token=now_token, - since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=True, - ) - - defer.returnValue(room_sync) + return self.generate_sync_result(sync_config, batch_token, full_state) @defer.inlineCallbacks def push_rules_for_user(self, user): @@ -453,35 +204,6 @@ class SyncHandler(object): rules = format_push_rules_for_user(user, rawrules, enabled_map) defer.returnValue(rules) - def account_data_for_user(self, account_data): - account_data_events = [] - - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - - def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): - account_data_events = [] - tags = tags_by_room.get(room_id) - if tags is not None: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = account_data_by_room.get(room_id, {}) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -544,305 +266,22 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) - def full_state_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token, - timeline_since_token, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred ArchivedSyncResult. - """ - - return self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, - account_data_by_room, full_state=True, leave_token=leave_token, - ) - @defer.inlineCallbacks - def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts, - sync_config, pagination_limit): - start_ts = yield self._get_room_timestamps_at_token( - room_ids, since_token, - sync_config=sync_config, - limit=pagination_limit, - ) - - missing_list = frozenset( - room_id for room_id, ts in - sorted(start_ts.items(), key=lambda item: -item[1]) - if ts < pa_ts - ) - - defer.returnValue(missing_list) - - @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, batch_token): - """ Get the incremental delta needed to bring the client up to - date with the server. - Returns: - A Deferred SyncResult. - """ - since_token = batch_token.stream_token - room_pagination_config = batch_token.pagination_config - - now_token = yield self.event_sources.get_current_token() - - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - - presence_source = self.event_sources.sources["presence"] - presence, presence_key = yield presence_source.get_new_events( - user=sync_config.user, - from_key=since_token.presence_key, - limit=sync_config.filter_collection.presence_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("presence_key", presence_key) - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - - app_service = yield self.store.get_app_service_by_user_id( - sync_config.user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - rooms = yield self.store.get_rooms_for_user( - sync_config.user.to_string() - ) - joined_room_ids = set(r.room_id for r in rooms) - - user_id = sync_config.user.to_string() - - timeline_limit = sync_config.filter_collection.timeline_limit() - - tags_by_room = yield self.store.get_updated_tags( - user_id, - since_token.account_data_key, - ) - - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - user_id, - since_token.account_data_key, - ) - ) - - push_rules_changed = yield self.store.have_push_rules_changed_for_user( - user_id, int(since_token.push_rules_key) - ) - - if push_rules_changed: - account_data["m.push_rules"] = yield self.push_rules_for_user( - sync_config.user - ) - - ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( - "m.ignored_user_list", user_id=user_id, - ) - - if ignored_account_data: - ignored_users = ignored_account_data.get("ignored_users", {}).keys() - else: - ignored_users = frozenset() - - # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - mem_change_events_by_room_id = {} - for event in rooms_changed: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - - newly_joined_rooms = [] - archived = [] - invited = [] - for room_id, events in mem_change_events_by_room_id.items(): - non_joins = [e for e in events if e.membership != Membership.JOIN] - has_join = len(non_joins) != len(events) - - # We want to figure out if we joined the room at some point since - # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - if room_id in joined_room_ids or has_join: - old_state = yield self.get_state_at(room_id, since_token) - old_mem_ev = old_state.get((EventTypes.Member, user_id), None) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: - newly_joined_rooms.append(room_id) - - if room_id in joined_room_ids: - continue - - if not non_joins: - continue - - # Only bother if we're still currently invited - should_invite = non_joins[-1].membership == Membership.INVITE - if should_invite: - if event.sender not in ignored_users: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) - - # Always include leave/ban events. Just take the last one. - # TODO: How do we handle ban -> leave in same batch? - leave_events = [ - e for e in non_joins - if e.membership in (Membership.LEAVE, Membership.BAN) - ] - - if leave_events: - leave_event = leave_events[-1] - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event.event_id, since_token, - tags_by_room, account_data_by_room, - full_state=room_id in newly_joined_rooms - ) - if room_sync: - archived.append(room_sync) - - # Get all events for rooms we're currently joined to. - room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - joined = [] - # We loop through all room ids, even if there are no new events, in case - # there are non room events taht we need to notify about. - for room_id in joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - p_room_token = room_pagination_config.get("t", None) - if p_room_token: - pa_limit = room_pagination_config["l"] - needing_full_state = yield self._get_rooms_that_need_full_state( - [room_id], - since_token, - room_pagination_config.get("ts", 0), - sync_config=sync_config, - pagination_limit=pa_limit, - ) - need_full_state = room_id in needing_full_state - else: - need_full_state = False - - newly_joined_room = (room_id in newly_joined_rooms) or need_full_state - full_state = newly_joined_room - - batch = yield self.load_filtered_recents( - room_id, sync_config, prev_batch_token, - since_token=since_token, - recents=events, - newly_joined_room=newly_joined_room, - ) - else: - batch = TimelineBatch( - events=[], - prev_batch=since_token, - limited=False, - ) - full_state = False - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id=room_id, - sync_config=sync_config, - since_token=since_token, - now_token=now_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=full_state, - ) - if room_sync: - if not room_sync.timeline: - p_room_token = room_pagination_config.get("t", None) - if p_room_token: - pa_limit = room_pagination_config["l"] - needing_full_state = yield self._get_rooms_that_need_full_state( - [room_id], - since_token, - room_pagination_config.get("ts", 0), - sync_config=sync_config, - pagination_limit=pa_limit, - ) - if room_id in needing_full_state: - continue - joined.append(room_sync) - - # For each newly joined room, we want to send down presence of - # existing users. - presence_handler = self.presence_handler - extra_presence_users = set() - for room_id in newly_joined_rooms: - users = yield self.store.get_users_in_room(event.room_id) - extra_presence_users.update(users) - - # For each new member, send down presence. - for joined_sync in joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - extra_presence_users.add(event.state_key) - - states = yield presence_handler.get_states( - [u for u in extra_presence_users if u != user_id], - as_event=True, - ) - presence.extend(states) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=batch_token.replace(stream_token=now_token), - )) - - @defer.inlineCallbacks - def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None, recents=None, newly_joined_room=False): + def _load_filtered_recents(self, room_id, sync_config, now_token, + since_token=None, recents=None, newly_joined_room=False): """ Returns: a Deferred TimelineBatch """ with Measure(self.clock, "load_filtered_recents"): - filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 10) - max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: limited = False - if recents is not None: + if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( self.store, @@ -852,6 +291,19 @@ class SyncHandler(object): else: recents = [] + if not limited: + defer.returnValue(TimelineBatch( + events=recents, + prev_batch=now_token, + limited=False + )) + + filtering_factor = 2 + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + since_key = None if since_token and not newly_joined_room: since_key = since_token.room_key @@ -895,103 +347,6 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def incremental_sync_with_gap_for_room(self, room_id, sync_config, - since_token, now_token, - ephemeral_by_room, tags_by_room, - account_data_by_room, - batch, full_state=False): - state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - unread_notifications = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - ) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config - ) - - if notifs is not None: - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, - since_token, tags_by_room, - account_data_by_room, full_state, - leave_token=None): - """ Get the incremental delta needed to bring the client up to date for - the archived room. - Returns: - A Deferred ArchivedSyncResult - """ - - if not leave_token: - stream_token = yield self.store.get_stream_token_for_event( - leave_event_id - ) - - leave_token = since_token.copy_and_replace("room_key", stream_token) - - if since_token and since_token.is_after(leave_token): - defer.returnValue(None) - - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token, - ) - - logger.debug("Recents %r", batch) - - state_events_delta = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, leave_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state_events_delta, - account_data=account_data, - ) - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks def get_state_after_event(self, event): """ Get the room state after the given event @@ -1116,26 +471,6 @@ class SyncHandler(object): for e in sync_config.filter_collection.filter_room_state(state.values()) }) - def check_joined_room(self, sync_config, state_delta): - """ - Check if the user has just joined the given room (so should - be given the full state) - - Args: - sync_config(synapse.handlers.sync.SyncConfig): - state_delta(dict[(str,str), synapse.events.FrozenEvent]): the - difference in state since the last sync - - Returns: - A deferred Tuple (state_delta, limited) - """ - join_event = state_delta.get(( - EventTypes.Member, sync_config.user.to_string()), None) - if join_event is not None: - if join_event.content["membership"] == Membership.JOIN: - return True - return False - @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): with Measure(self.clock, "unread_notifs_for_room_id"): @@ -1156,6 +491,611 @@ class SyncHandler(object): # count is whatever it was last time. defer.returnValue(None) + @defer.inlineCallbacks + def generate_sync_result(self, sync_config, batch_token=None, full_state=False): + """Generates a sync result. + + Args: + sync_config (SyncConfig) + since_token (StreamToken) + full_state (bool) + + Returns: + Deferred(SyncResult) + """ + + # NB: The now_token gets changed by some of the generate_sync_* methods, + # this is due to some of the underlying streams not supporting the ability + # to query up to a given point. + # Always use the `now_token` in `SyncResultBuilder` + now_token = yield self.event_sources.get_current_token() + + sync_result_builder = SyncResultBuilder( + sync_config, full_state, + batch_token=batch_token, + now_token=now_token, + ) + + account_data_by_room = yield self._generate_sync_entry_for_account_data( + sync_result_builder + ) + + res = yield self._generate_sync_entry_for_rooms( + sync_result_builder, account_data_by_room + ) + newly_joined_rooms, newly_joined_users = res + + yield self._generate_sync_entry_for_presence( + sync_result_builder, newly_joined_rooms, newly_joined_users + ) + + defer.returnValue(SyncResult( + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + next_batch=SyncNextBatchToken( + stream_token=sync_result_builder.now_token, + pagination_config=batch_token.pagination_config if batch_token else None, + ) + )) + + @defer.inlineCallbacks + def _generate_sync_entry_for_account_data(self, sync_result_builder): + """Generates the account data portion of the sync response. Populates + `sync_result_builder` with the result. + + Args: + sync_result_builder(SyncResultBuilder) + + Returns: + Deferred(dict): A dictionary containing the per room account data. + """ + sync_config = sync_result_builder.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + + if since_token and not sync_result_builder.full_state: + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + user_id, + since_token.account_data_key, + ) + ) + + push_rules_changed = yield self.store.have_push_rules_changed_for_user( + user_id, int(since_token.push_rules_key) + ) + + if push_rules_changed: + account_data["m.push_rules"] = yield self.push_rules_for_user( + sync_config.user + ) + else: + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + account_data['m.push_rules'] = yield self.push_rules_for_user( + sync_config.user + ) + + account_data_for_user = sync_config.filter_collection.filter_account_data([ + {"type": account_data_type, "content": content} + for account_data_type, content in account_data.items() + ]) + + sync_result_builder.account_data = account_data_for_user + + defer.returnValue(account_data_by_room) + + @defer.inlineCallbacks + def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms, + newly_joined_users): + """Generates the presence portion of the sync response. Populates the + `sync_result_builder` with the result. + + Args: + sync_result_builder(SyncResultBuilder) + newly_joined_rooms(list): List of rooms that the user has joined + since the last sync (or empty if an initial sync) + newly_joined_users(list): List of users that have joined rooms + since the last sync (or empty if an initial sync) + """ + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + user = sync_result_builder.sync_config.user + + presence_source = self.event_sources.sources["presence"] + + since_token = sync_result_builder.since_token + if since_token and not sync_result_builder.full_state: + presence_key = since_token.presence_key + include_offline = True + else: + presence_key = None + include_offline = False + + presence, presence_key = yield presence_source.get_new_events( + user=user, + from_key=presence_key, + is_guest=sync_config.is_guest, + include_offline=include_offline, + ) + sync_result_builder.now_token = now_token.copy_and_replace( + "presence_key", presence_key + ) + + extra_users_ids = set(newly_joined_users) + for room_id in newly_joined_rooms: + users = yield self.store.get_users_in_room(room_id) + extra_users_ids.update(users) + extra_users_ids.discard(user.to_string()) + + states = yield self.presence_handler.get_states( + extra_users_ids, + as_event=True, + ) + presence.extend(states) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + + sync_result_builder.presence = presence + + @defer.inlineCallbacks + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + """Generates the rooms portion of the sync response. Populates the + `sync_result_builder` with the result. + + Args: + sync_result_builder(SyncResultBuilder) + account_data_by_room(dict): Dictionary of per room account data + + Returns: + Deferred(tuple): Returns a 2-tuple of + `(newly_joined_rooms, newly_joined_users)` + """ + user_id = sync_result_builder.sync_config.user.to_string() + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_result_builder.sync_config, + now_token=sync_result_builder.now_token, + since_token=sync_result_builder.since_token, + ) + sync_result_builder.now_token = now_token + + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id=user_id, + ) + + if ignored_account_data: + ignored_users = ignored_account_data.get("ignored_users", {}).keys() + else: + ignored_users = frozenset() + + if sync_result_builder.since_token: + res = yield self._get_rooms_changed(sync_result_builder, ignored_users) + room_entries, invited, newly_joined_rooms = res + + tags_by_room = yield self.store.get_updated_tags( + user_id, + sync_result_builder.since_token.account_data_key, + ) + else: + res = yield self._get_all_rooms(sync_result_builder, ignored_users) + room_entries, invited, newly_joined_rooms = res + + tags_by_room = yield self.store.get_tags_for_user(user_id) + + def handle_room_entries(room_entry): + return self._generate_room_entry( + sync_result_builder, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builder.full_state, + ) + + yield concurrently_execute(handle_room_entries, room_entries, 10) + + sync_result_builder.invited.extend(invited) + + # Now we want to get any newly joined users + newly_joined_users = set() + if sync_result_builder.since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) + + defer.returnValue((newly_joined_rooms, newly_joined_users)) + + @defer.inlineCallbacks + def _get_rooms_changed(self, sync_result_builder, ignored_users): + """Gets the the changes that have happened since the last sync. + + Args: + sync_result_builder(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` + """ + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + assert since_token + + app_service = yield self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + room_entries = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + if event.sender not in ignored_users: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + leave_stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + leave_token = since_token.copy_and_replace( + "room_key", leave_stream_token + ) + + if since_token and since_token.is_after(leave_token): + continue + + room_entries.append(RoomSyncResultBuilder( + room_id=room_id, + rtype="archived", + events=None, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=leave_token, + )) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + room_entries.append(RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=events, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=None if room_id in newly_joined_rooms else since_token, + upto_token=prev_batch_token, + )) + else: + room_entries.append(RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=[], + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=since_token, + )) + + defer.returnValue((room_entries, invited, newly_joined_rooms)) + + @defer.inlineCallbacks + def _get_all_rooms(self, sync_result_builder, ignored_users): + """Returns entries for all rooms for the user. + + Args: + sync_result_builder(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], [])` + """ + + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + membership_list = ( + Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + ) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=membership_list + ) + + room_entries = [] + invited = [] + + for event in room_list: + if event.membership == Membership.JOIN: + room_entries.append(RoomSyncResultBuilder( + room_id=event.room_id, + rtype="joined", + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=now_token, + )) + elif event.membership == Membership.INVITE: + if event.sender in ignored_users: + continue + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + continue + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_entries.append(RoomSyncResultBuilder( + room_id=event.room_id, + rtype="archived", + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=leave_token, + )) + + defer.returnValue((room_entries, invited, [])) + + @defer.inlineCallbacks + def _generate_room_entry(self, sync_result_builder, ignored_users, + room_builder, ephemeral, tags, account_data, + always_include=False): + """Populates the `joined` and `archived` section of `sync_result_builder` + based on the `room_builder`. + + Args: + sync_result_builder(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + room_builder(RoomSyncResultBuilder) + ephemeral(list): List of new ephemeral events for room + tags(list): List of *all* tags for room, or None if there has been + no change. + account_data(list): List of new account data for room + always_include(bool): Always include this room in the sync response, + even if empty. + """ + newly_joined = room_builder.newly_joined + full_state = ( + room_builder.full_state + or newly_joined + or sync_result_builder.full_state + ) + events = room_builder.events + + # We want to shortcut out as early as possible. + if not (always_include or account_data or ephemeral or full_state): + if events == [] and tags is None: + return + + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + room_id = room_builder.room_id + since_token = room_builder.since_token + upto_token = room_builder.upto_token + + batch = yield self._load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined, + ) + + account_data_events = [] + if tags is not None: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + + if not (always_include or batch or account_data or ephemeral or full_state): + return + + state = yield self.compute_state_delta( + room_id, batch, sync_config, since_token, now_token, + full_state=full_state + ) + + if room_builder.rtype == "joined": + unread_notifications = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + ) + + if room_sync or always_include: + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config + ) + + if notifs is not None: + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] + + sync_result_builder.joined.append(room_sync) + elif room_builder.rtype == "archived": + room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data, + ) + if room_sync or always_include: + sync_result_builder.archived.append(room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) + + @defer.inlineCallbacks + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, + limit): + room_to_entries = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + entry = yield self.store.get_last_ts_for_room( + room_id, token.room_key + ) + + # TODO: Is this ever possible? + room_to_entries[room_id] = entry if entry else { + "origin_server_ts": 0, + } + + yield concurrently_execute(_get_last_ts, room_ids, 10) + + if len(room_to_entries) <= limit: + defer.returnValue({ + room_id: entry["origin_server_ts"] + for room_id, entry in room_to_entries.items() + }) + + queued_events = sorted( + room_to_entries.items(), + key=lambda e: -e[1]["origin_server_ts"] + ) + + to_return = {} + + while len(to_return) < limit and len(queued_events) > 0: + to_fetch = queued_events[:limit - len(to_return)] + event_to_q = { + e["event_id"]: (room_id, e) for room_id, e in to_fetch + if "event_id" in e + } + + # Now we fetch each event to check if its been filtered out + event_map = yield self.store.get_events(event_to_q.keys()) + + recents = sync_config.filter_collection.filter_room_timeline( + event_map.values() + ) + recents = yield filter_events_for_client( + self.store, + sync_config.user.to_string(), + recents, + ) + + to_return.update({r.room_id: r.origin_server_ts for r in recents}) + + for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): + queued_events.append(event_to_q[ev_id]) + + # FIXME: Need to refetch TS + queued_events.sort(key=lambda e: -e[1]["origin_server_ts"]) + + defer.returnValue(to_return) + def _action_has_highlight(actions): for action in actions: @@ -1203,3 +1143,52 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): (e.type, e.state_key): e for e in evs } + + +class SyncResultBuilder(object): + "Used to help build up a new SyncResult for a user" + def __init__(self, sync_config, full_state, batch_token, now_token): + """ + Args: + sync_config(SyncConfig) + full_state(bool): The full_state flag as specified by user + batch_token(SyncNextBatchToken): The token supplied by user, or None. + now_token(StreamToken): The token to sync up to. + """ + self.sync_config = sync_config + self.full_state = full_state + self.batch_token = batch_token + self.since_token = batch_token.stream_token if batch_token else None + self.now_token = now_token + + self.presence = [] + self.account_data = [] + self.joined = [] + self.invited = [] + self.archived = [] + + +class RoomSyncResultBuilder(object): + """Stores information needed to create either a `JoinedSyncResult` or + `ArchivedSyncResult`. + """ + def __init__(self, room_id, rtype, events, newly_joined, full_state, + since_token, upto_token): + """ + Args: + room_id(str) + rtype(str): One of `"joined"` or `"archived"` + events(list): List of events to include in the room, (more events + may be added when generating result). + newly_joined(bool): If the user has newly joined the room + full_state(bool): Whether the full state should be sent in result + since_token(StreamToken): Earliest point to return events from, or None + upto_token(StreamToken): Latest point to return events from. + """ + self.room_id = room_id + self.rtype = rtype + self.events = events + self.newly_joined = newly_joined + self.full_state = full_state + self.since_token = since_token + self.upto_token = upto_token diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index e71d01e77d..89a3b5e90a 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py
@@ -38,7 +38,9 @@ def get_badge_count(store, user_id): r.room_id, user_id, last_unread_event_id ) ) - badge += notifs["notify_count"] + # return one badge count per conversation, as count per + # message is so noisy as to be almost useless + badge += 1 if notifs["notify_count"] else 0 defer.returnValue(badge) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index cf478c6f79..644aa4e513 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py
@@ -232,7 +232,10 @@ class JoinRoomAliasServlet(ClientV1RestServlet): if RoomID.is_valid(room_identifier): room_id = room_identifier - remote_room_hosts = None + try: + remote_room_hosts = request.args["server_name"] + except: + remote_room_hosts = None elif RoomAlias.is_valid(room_identifier): handler = self.handlers.room_member_handler room_alias = RoomAlias.from_string(room_identifier) diff --git a/synapse/types.py b/synapse/types.py
index 160bd45fe4..0c9efdfd00 100644 --- a/synapse/types.py +++ b/synapse/types.py
@@ -125,20 +125,49 @@ class SyncNextBatchToken( def from_string(cls, string): try: d = json.loads(decode_base64(string)) - return cls(StreamToken.from_string(d["t"]), d.get("pa", {})) + pa = d.get("pa", None) + if pa: + pa = SyncPaginationConfig.from_dict(pa) + return cls( + stream_token=StreamToken.from_string(d["t"]), + pagination_config=pa, + ) except: raise SynapseError(400, "Invalid Token") def to_string(self): return encode_base64(json.dumps({ "t": self.stream_token.to_string(), - "pa": self.pagination_config, + "pa": self.pagination_config.to_dict() if self.pagination_config else None, })) def replace(self, **kwargs): return self._replace(**kwargs) +class SyncPaginationConfig( + namedtuple("SyncPaginationConfig", ( + "order", + "value", + )) +): + @classmethod + def from_dict(cls, d): + try: + return cls(d["o"], d["v"]) + except: + raise SynapseError(400, "Invalid Token") + + def to_dict(self): + return { + "o": self.order, + "v": self.value, + } + + def replace(self, **kwargs): + return self._replace(**kwargs) + + class StreamToken( namedtuple("Token", ( "room_key",