diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/auth.py | 32 | ||||
-rw-r--r-- | synapse/handlers/device.py | 216 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 1 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 34 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 269 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 37 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 11 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 98 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 14 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 5 | ||||
-rw-r--r-- | synapse/handlers/room.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 66 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 88 |
13 files changed, 679 insertions, 199 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index fffba34383..e7a1bb7246 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,6 +48,7 @@ class AuthHandler(BaseHandler): LoginType.PASSWORD: self._check_password_auth, LoginType.RECAPTCHA: self._check_recaptcha, LoginType.EMAIL_IDENTITY: self._check_email_identity, + LoginType.MSISDN: self._check_msisdn, LoginType.DUMMY: self._check_dummy_auth, } self.bcrypt_rounds = hs.config.bcrypt_rounds @@ -307,31 +309,47 @@ class AuthHandler(BaseHandler): defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) - @defer.inlineCallbacks def _check_email_identity(self, authdict, _): + return self._check_threepid('email', authdict) + + def _check_msisdn(self, authdict, _): + return self._check_threepid('msisdn', authdict) + + @defer.inlineCallbacks + def _check_dummy_auth(self, authdict, _): + yield run_on_reactor() + defer.returnValue(True) + + @defer.inlineCallbacks + def _check_threepid(self, medium, authdict): yield run_on_reactor() if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) threepid_creds = authdict['threepid_creds'] + identity_handler = self.hs.get_handlers().identity_handler - logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,)) + logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,)) threepid = yield identity_handler.threepid_from_creds(threepid_creds) if not threepid: raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) + if threepid['medium'] != medium: + raise LoginError( + 401, + "Expecting threepid of type '%s', got '%s'" % ( + medium, threepid['medium'], + ), + errcode=Codes.UNAUTHORIZED + ) + threepid['threepid_creds'] = authdict['threepid_creds'] defer.returnValue(threepid) - @defer.inlineCallbacks - def _check_dummy_auth(self, authdict, _): - yield run_on_reactor() - defer.returnValue(True) - def _get_params_recaptcha(self): return {"public_key": self.hs.config.recaptcha_public_key} diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ca7137f315..c22f65ce5d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -16,6 +16,7 @@ from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -34,10 +35,11 @@ class DeviceHandler(BaseHandler): self.state = hs.get_state_handler() self.federation_sender = hs.get_federation_sender() self.federation = hs.get_replication_layer() - self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + self._edu_updater = DeviceListEduUpdater(hs, self) self.federation.register_edu_handler( - "m.device_list_update", self._incoming_device_list_update, + "m.device_list_update", self._edu_updater.incoming_device_list_update, ) self.federation.register_query_handler( "user_devices", self.on_federation_query_user_devices, @@ -168,6 +170,40 @@ class DeviceHandler(BaseHandler): yield self.notify_device_update(user_id, [device_id]) @defer.inlineCallbacks + def delete_devices(self, user_id, device_ids): + """ Delete several devices + + Args: + user_id (str): + device_ids (str): The list of device IDs to delete + + Returns: + defer.Deferred: + """ + + try: + yield self.store.delete_devices(user_id, device_ids) + except errors.StoreError, e: + if e.code == 404: + # no match + pass + else: + raise + + # Delete access tokens and e2e keys for each device. Not optimised as it is not + # considered as part of a critical path. + for device_id in device_ids: + yield self.store.user_delete_access_tokens( + user_id, device_id=device_id, + delete_refresh_tokens=True, + ) + yield self.store.delete_e2e_keys_by_device( + user_id=user_id, device_id=device_id + ) + + yield self.notify_device_update(user_id, device_ids) + + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device @@ -212,8 +248,7 @@ class DeviceHandler(BaseHandler): user_id, device_ids, list(hosts) ) - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = [r.room_id for r in rooms] + room_ids = yield self.store.get_rooms_for_user(user_id) yield self.notifier.on_new_event( "device_list_key", position, rooms=room_ids, @@ -234,8 +269,7 @@ class DeviceHandler(BaseHandler): user_id (str) from_token (StreamToken) """ - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(r.room_id for r in rooms) + room_ids = yield self.store.get_rooms_for_user(user_id) # First we check if any devices have changed changed = yield self.store.get_user_whose_devices_changed( @@ -260,7 +294,7 @@ class DeviceHandler(BaseHandler): # ordering: treat it the same as a new room event_ids = [] - current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = yield self.store.get_current_state_ids(room_id) # special-case for an empty prev state: include all members # in the changed list @@ -299,39 +333,103 @@ class DeviceHandler(BaseHandler): # and those that actually still share a room with the user defer.returnValue(users_who_share_room & possibly_changed) - @measure_func("_incoming_device_list_update") @defer.inlineCallbacks - def _incoming_device_list_update(self, origin, edu_content): - user_id = edu_content["user_id"] - device_id = edu_content["device_id"] - stream_id = edu_content["stream_id"] - prev_ids = edu_content.get("prev_id", []) + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + defer.returnValue({ + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + }) + + @defer.inlineCallbacks + def user_left_room(self, user, room_id): + user_id = user.to_string() + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: + # We no longer share rooms with this user, so we'll no longer + # receive device updates. Mark this in DB. + yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + + +def _update_device_from_client_ips(device, client_ips): + ip = client_ips.get((device["user_id"], device["device_id"]), {}) + device.update({ + "last_seen_ts": ip.get("last_seen"), + "last_seen_ip": ip.get("ip"), + }) + + +class DeviceListEduUpdater(object): + "Handles incoming device list updates from federation and updates the DB" + + def __init__(self, hs, device_handler): + self.store = hs.get_datastore() + self.federation = hs.get_replication_layer() + self.clock = hs.get_clock() + self.device_handler = device_handler + + self._remote_edu_linearizer = Linearizer(name="remote_device_list") + + # user_id -> list of updates waiting to be handled. + self._pending_updates = {} + + # Recently seen stream ids. We don't bother keeping these in the DB, + # but they're useful to have them about to reduce the number of spurious + # resyncs. + self._seen_updates = ExpiringCache( + cache_name="device_update_edu", + clock=self.clock, + max_len=10000, + expiry_ms=30 * 60 * 1000, + iterable=True, + ) + + @defer.inlineCallbacks + def incoming_device_list_update(self, origin, edu_content): + """Called on incoming device list update from federation. Responsible + for parsing the EDU and adding to pending updates list. + """ + + user_id = edu_content.pop("user_id") + device_id = edu_content.pop("device_id") + stream_id = str(edu_content.pop("stream_id")) # They may come as ints + prev_ids = edu_content.pop("prev_id", []) + prev_ids = [str(p) for p in prev_ids] # They may come as ints if get_domain_from_id(user_id) != origin: # TODO: Raise? logger.warning("Got device list update edu for %r from %r", user_id, origin) return - rooms = yield self.store.get_rooms_for_user(user_id) - if not rooms: + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. return - with (yield self._remote_edue_linearizer.queue(user_id)): - # If the prev id matches whats in our cache table, then we don't need - # to resync the users device list, otherwise we do. - resync = True - if len(prev_ids) == 1: - extremity = yield self.store.get_device_list_last_stream_id_for_remote( - user_id - ) - logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) - if str(extremity) == str(prev_ids[0]): - resync = False + self._pending_updates.setdefault(user_id, []).append( + (device_id, stream_id, prev_ids, edu_content) + ) + + yield self._handle_device_updates(user_id) + + @measure_func("_incoming_device_list_update") + @defer.inlineCallbacks + def _handle_device_updates(self, user_id): + "Actually handle pending updates." + + with (yield self._remote_edu_linearizer.queue(user_id)): + pending_updates = self._pending_updates.pop(user_id, []) + if not pending_updates: + # This can happen since we batch updates + return + + resync = yield self._need_to_do_resync(user_id, pending_updates) if resync: # Fetch all devices for the user. + origin = get_domain_from_id(user_id) result = yield self.federation.query_user_devices(origin, user_id) stream_id = result["stream_id"] devices = result["devices"] @@ -339,40 +437,50 @@ class DeviceHandler(BaseHandler): user_id, devices, stream_id, ) device_ids = [device["device_id"] for device in devices] - yield self.notify_device_update(user_id, device_ids) + yield self.device_handler.notify_device_update(user_id, device_ids) else: # Simply update the single device, since we know that is the only # change (becuase of the single prev_id matching the current cache) - content = dict(edu_content) - for key in ("user_id", "device_id", "stream_id", "prev_ids"): - content.pop(key, None) - yield self.store.update_remote_device_list_cache_entry( - user_id, device_id, content, stream_id, + for device_id, stream_id, prev_ids, content in pending_updates: + yield self.store.update_remote_device_list_cache_entry( + user_id, device_id, content, stream_id, + ) + + yield self.device_handler.notify_device_update( + user_id, [device_id for device_id, _, _, _ in pending_updates] ) - yield self.notify_device_update(user_id, [device_id]) - @defer.inlineCallbacks - def on_federation_query_user_devices(self, user_id): - stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - defer.returnValue({ - "user_id": user_id, - "stream_id": stream_id, - "devices": devices, - }) + self._seen_updates.setdefault(user_id, set()).update( + stream_id for _, stream_id, _, _ in pending_updates + ) @defer.inlineCallbacks - def user_left_room(self, user, room_id): - user_id = user.to_string() - rooms = yield self.store.get_rooms_for_user(user_id) - if not rooms: - # We no longer share rooms with this user, so we'll no longer - # receive device updates. Mark this in DB. - yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + def _need_to_do_resync(self, user_id, updates): + """Given a list of updates for a user figure out if we need to do a full + resync, or whether we have enough data that we can just apply the delta. + """ + seen_updates = self._seen_updates.get(user_id, set()) + extremity = yield self.store.get_device_list_last_stream_id_for_remote( + user_id + ) -def _update_device_from_client_ips(device, client_ips): - ip = client_ips.get((device["user_id"], device["device_id"]), {}) - device.update({ - "last_seen_ts": ip.get("last_seen"), - "last_seen_ip": ip.get("ip"), - }) + stream_id_in_updates = set() # stream_ids in updates list + for _, stream_id, prev_ids, _ in updates: + if not prev_ids: + # We always do a resync if there are no previous IDs + defer.returnValue(True) + + for prev_id in prev_ids: + if prev_id == extremity: + continue + elif prev_id in seen_updates: + continue + elif prev_id in stream_id_in_updates: + continue + else: + defer.returnValue(True) + + stream_id_in_updates.add(stream_id) + + defer.returnValue(False) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 1b5317edf5..943554ce98 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler): "room_alias": room_alias.to_string(), }, retry_on_dns_fail=False, + ignore_backoff=True, ) except CodeMessageException as e: logging.warn("Error retrieving alias") diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index e40495d1ab..c2b38d72a9 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred -from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -121,15 +121,11 @@ class E2eKeysHandler(object): def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] try: - limiter = yield get_retry_limiter( - destination, self.clock, self.store + remote_result = yield self.federation.query_client_keys( + destination, + {"device_keys": destination_query}, + timeout=timeout ) - with limiter: - remote_result = yield self.federation.query_client_keys( - destination, - {"device_keys": destination_query}, - timeout=timeout - ) for user_id, keys in remote_result["device_keys"].items(): if user_id in destination_query: @@ -239,18 +235,14 @@ class E2eKeysHandler(object): def claim_client_keys(destination): device_keys = remote_queries[destination] try: - limiter = yield get_retry_limiter( - destination, self.clock, self.store + remote_result = yield self.federation.claim_client_keys( + destination, + {"one_time_keys": device_keys}, + timeout=timeout ) - with limiter: - remote_result = yield self.federation.claim_client_keys( - destination, - {"one_time_keys": device_keys}, - timeout=timeout - ) - for user_id, keys in remote_result["one_time_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys except CodeMessageException as e: failures[destination] = { "status": e.code, "message": e.message @@ -316,7 +308,7 @@ class E2eKeysHandler(object): # old access_token without an associated device_id. Either way, we # need to double-check the device is registered to avoid ending up with # keys without a corresponding device. - self.device_handler.check_device_registered(user_id, device_id) + yield self.device_handler.check_device_registered(user_id, device_id) result = yield self.store.count_e2e_one_time_keys(user_id, device_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 996bfd0e23..888dd01240 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,6 +14,7 @@ # limitations under the License. """Contains handlers for federation events.""" +import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -31,7 +32,7 @@ from synapse.util.logcontext import ( ) from synapse.util.metrics import measure_func from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, Linearizer from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, @@ -79,29 +80,216 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} + self._room_pdu_linearizer = Linearizer("fed_room_pdu") - @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it through the StateHandler. + @log_function + def on_receive_pdu(self, origin, pdu, get_missing=True): + """ Process a PDU received via a federation /send/ transaction, or + via backfill of missing prev_events + + Args: + origin (str): server which initiated the /send/ transaction. Will + be used to fetch missing events or state. + pdu (FrozenEvent): received PDU + get_missing (bool): True if we should fetch missing prev_events - auth_chain and state are None if we already have the necessary state - and prev_events in the db + Returns (Deferred): completes with None """ - event = pdu - logger.debug("Got event: %s", event.event_id) + # We reprocess pdus when we have seen them only as outliers + existing = yield self.get_persisted_pdu( + origin, pdu.event_id, do_auth=False + ) + + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + + already_seen = ( + existing and ( + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() + ) + ) + if already_seen: + logger.debug("Already seen pdu %s", pdu.event_id) + return # If we are currently in the process of joining this room, then we # queue up events for later processing. - if event.room_id in self.room_queues: - self.room_queues[event.room_id].append((pdu, origin)) + if pdu.room_id in self.room_queues: + logger.info("Ignoring PDU %s for room %s from %s for now; join " + "in progress", pdu.event_id, pdu.room_id, origin) + self.room_queues[pdu.room_id].append((pdu, origin)) return - logger.debug("Processing event: %s", event.event_id) + state = None + + auth_chain = [] + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + fetch_state = False + + # Get missing pdus if necessary. + if not pdu.internal_metadata.is_outlier(): + # We only backfill backwards to the min depth. + min_depth = yield self.get_min_depth_for_context( + pdu.room_id + ) + + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + + if min_depth and pdu.depth < min_depth: + # This is so that we don't notify the user about this + # message, to work around the fact that some events will + # reference really really old events we really don't want to + # send to the clients. + pdu.internal_metadata.outlier = True + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + logger.info( + "Acquiring lock for room %r to fetch %d missing events: %r...", + pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + ) + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + logger.info( + "Acquired lock for room %r to fetch %d missing events", + pdu.room_id, len(prevs - seen), + ) + + yield self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + logger.info( + "Still missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + fetch_state = True + + if fetch_state: + # We need to get the state at this event, since we haven't + # processed all the prev events. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + try: + state, auth_chain = yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) + except: + logger.exception("Failed to get state for event: %s", pdu.event_id) + + yield self._process_received_pdu( + origin, + pdu, + state=state, + auth_chain=auth_chain, + ) + + @defer.inlineCallbacks + def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): + """ + Args: + origin (str): Origin of the pdu. Will be called to get the missing events + pdu: received pdu + prevs (str[]): List of event ids which we are missing + min_depth (int): Minimum depth of events to return. + + Returns: + Deferred<dict(str, str?)>: updated have_seen dictionary + """ + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) - logger.debug("Event: %s", event) + if not prevs - seen: + # nothing left to do + defer.returnValue(have_seen) + + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 + + missing_events = yield self.replication_layer.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + timeout=10000, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + defer.returnValue(have_seen) + + @log_function + @defer.inlineCallbacks + def _process_received_pdu(self, origin, pdu, state, auth_chain): + """ Called when we have a new pdu. We need to do auth checks and put it + through the StateHandler. + """ + event = pdu + + logger.debug("Processing event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -670,8 +858,6 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - yield self.store.clean_room_for_join(room_id) - origin, event = yield self._make_and_verify_event( target_hosts, room_id, @@ -680,7 +866,15 @@ class FederationHandler(BaseHandler): content, ) + # This shouldn't happen, because the RoomMemberHandler has a + # linearizer lock which only allows one operation per user per room + # at a time - so this is just paranoia. + assert (room_id not in self.room_queues) + self.room_queues[room_id] = [] + + yield self.store.clean_room_for_join(room_id) + handled_events = set() try: @@ -733,18 +927,37 @@ class FederationHandler(BaseHandler): room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p, origin in room_queue: - if p.event_id in handled_events: - continue + # we don't need to wait for the queued events to be processed - + # it's just a best-effort thing at this point. We do want to do + # them roughly in order, though, otherwise we'll end up making + # lots of requests for missing prev_events which we do actually + # have. Hence we fire off the deferred, but don't wait for it. - try: - self.on_receive_pdu(origin, p) - except: - logger.exception("Couldn't handle pdu") + synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( + room_queue + ) defer.returnValue(True) @defer.inlineCallbacks + def _handle_queued_pdus(self, room_queue): + """Process PDUs which got queued up while we were busy send_joining. + + Args: + room_queue (list[FrozenEvent, str]): list of PDUs to be processed + and the servers that sent them + """ + for p, origin in room_queue: + try: + logger.info("Processing queued PDU %s which was received " + "while we were joining %s", p.event_id, p.room_id) + yield self.on_receive_pdu(origin, p) + except Exception as e: + logger.warn( + "Error handling queued PDU %s from %s: %s", + p.event_id, origin, e) + + @defer.inlineCallbacks @log_function def on_make_join_request(self, room_id, user_id): """ We've received a /make_join/ request, so we create a partial @@ -1096,7 +1309,7 @@ class FederationHandler(BaseHandler): if prev_id != event.event_id: results[(event.type, event.state_key)] = prev_id else: - del results[(event.type, event.state_key)] + results.pop((event.type, event.state_key), None) defer.returnValue(results.values()) else: @@ -1325,7 +1538,17 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): + """ + + Args: + origin: + event: + state: + auth_events: + Returns: + Deferred, which resolves to synapse.events.snapshot.EventContext + """ context = yield self.state_handler.compute_event_context( event, old_state=state, ) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 559e5d5a71..6a53c5eb47 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield self.http_client.post_urlencoded_get_json( + data = yield self.http_client.post_json_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" @@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler): except CodeMessageException as e: logger.info("Proxied requestToken failed: %r", e) raise e + + @defer.inlineCallbacks + def requestMsisdnToken( + self, id_server, country, phone_number, + client_secret, send_attempt, **kwargs + ): + yield run_on_reactor() + + if not self._should_trust_id_server(id_server): + raise SynapseError( + 400, "Untrusted ID server '%s'" % id_server, + Codes.SERVER_NOT_TRUSTED + ) + + params = { + 'country': country, + 'phone_number': phone_number, + 'client_secret': client_secret, + 'send_attempt': send_attempt, + } + params.update(kwargs) + + try: + data = yield self.http_client.post_json_get_json( + "https://%s%s" % ( + id_server, + "/_matrix/identity/api/v1/validate/msisdn/requestToken" + ), + params + ) + defer.returnValue(data) + except CodeMessageException as e: + logger.info("Proxied requestToken failed: %r", e) + raise e diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index e0ade4c164..10f5f35a69 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.handlers.presence import format_user_presence_state from synapse.streams.config import PaginationConfig from synapse.types import ( UserID, StreamToken, @@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler): "content": content, }) + now = self.clock.time_msec() + ret = { "rooms": rooms_ret, - "presence": presence, + "presence": [ + { + "type": "m.presence", + "content": format_user_presence_state(event, now), + } + for event in presence + ], "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da610e430f..1ede117c79 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -556,9 +557,9 @@ class PresenceHandler(object): room_ids_to_states = {} users_to_states = {} for state in states: - events = yield self.store.get_rooms_for_user(state.user_id) - for e in events: - room_ids_to_states.setdefault(e.room_id, []).append(state) + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) plist = yield self.store.get_presence_list_observers_accepted(state.user_id) for u in plist: @@ -574,8 +575,7 @@ class PresenceHandler(object): if not local_states: continue - users = yield self.store.get_users_in_room(room_id) - hosts = set(get_domain_from_id(u) for u in users) + hosts = yield self.store.get_hosts_in_room(room_id) for host in hosts: hosts_to_states.setdefault(host, []).extend(local_states) @@ -719,9 +719,7 @@ class PresenceHandler(object): for state in updates ]) else: - defer.returnValue([ - format_user_presence_state(state, now) for state in updates - ]) + defer.returnValue(updates) @defer.inlineCallbacks def set_state(self, target_user, state, ignore_status_msg=False): @@ -795,6 +793,9 @@ class PresenceHandler(object): as_event=False, ) + now = self.clock.time_msec() + results[:] = [format_user_presence_state(r, now) for r in results] + is_accepted = { row["observed_user_id"]: row["accepted"] for row in presence_list } @@ -847,6 +848,7 @@ class PresenceHandler(object): ) state_dict = yield self.get_state(observed_user, as_event=False) + state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) self.federation.send_edu( destination=observer_user.domain, @@ -910,11 +912,12 @@ class PresenceHandler(object): def is_visible(self, observed_user, observer_user): """Returns whether a user can see another user's presence. """ - observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string()) - observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string()) - - observer_room_ids = set(r.room_id for r in observer_rooms) - observed_room_ids = set(r.room_id for r in observed_rooms) + observer_room_ids = yield self.store.get_rooms_for_user( + observer_user.to_string() + ) + observed_room_ids = yield self.store.get_rooms_for_user( + observed_user.to_string() + ) if observer_room_ids & observed_room_ids: defer.returnValue(True) @@ -979,14 +982,18 @@ def should_notify(old_state, new_state): return False -def format_user_presence_state(state, now): +def format_user_presence_state(state, now, include_user_id=True): """Convert UserPresenceState to a format that can be sent down to clients and to other servers. + + The "user_id" is optional so that this function can be used to format presence + updates for client /sync responses and for federation /send requests. """ content = { "presence": state.state, - "user_id": state.user_id, } + if include_user_id: + content["user_id"] = state.user_id if state.last_active_ts: content["last_active_ago"] = now - state.last_active_ts if state.status_msg and state.state != PresenceState.OFFLINE: @@ -1025,7 +1032,6 @@ class PresenceEventSource(object): # sending down the rare duplicate is not a concern. with Measure(self.clock, "presence.get_new_events"): - user_id = user.to_string() if from_key is not None: from_key = int(from_key) @@ -1034,18 +1040,7 @@ class PresenceEventSource(object): max_token = self.store.get_current_presence_token() - plist = yield self.store.get_presence_list_accepted(user.localpart) - users_interested_in = set(row["observed_user_id"] for row in plist) - users_interested_in.add(user_id) # So that we receive our own presence - - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - users_interested_in.update(users_who_share_room) - - if explicit_room_id: - user_ids = yield self.store.get_users_in_room(explicit_room_id) - users_interested_in.update(user_ids) + users_interested_in = yield self._get_interested_in(user, explicit_room_id) user_ids_changed = set() changed = None @@ -1073,16 +1068,13 @@ class PresenceEventSource(object): updates = yield presence.current_state_for_users(user_ids_changed) - now = self.clock.time_msec() - - defer.returnValue(([ - { - "type": "m.presence", - "content": format_user_presence_state(s, now), - } - for s in updates.values() - if include_offline or s.state != PresenceState.OFFLINE - ], max_token)) + if include_offline: + defer.returnValue((updates.values(), max_token)) + else: + defer.returnValue(([ + s for s in updates.itervalues() + if s.state != PresenceState.OFFLINE + ], max_token)) def get_current_key(self): return self.store.get_current_presence_token() @@ -1090,6 +1082,31 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): return self.get_new_events(user, from_key=None, include_offline=False) + @cachedInlineCallbacks(num_args=2, cache_context=True) + def _get_interested_in(self, user, explicit_room_id, cache_context): + """Returns the set of users that the given user should see presence + updates for + """ + user_id = user.to_string() + plist = yield self.store.get_presence_list_accepted( + user.localpart, on_invalidate=cache_context.invalidate, + ) + users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in.add(user_id) # So that we receive our own presence + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id, on_invalidate=cache_context.invalidate, + ) + users_interested_in.update(users_who_share_room) + + if explicit_room_id: + user_ids = yield self.store.get_users_in_room( + explicit_room_id, on_invalidate=cache_context.invalidate, + ) + users_interested_in.update(user_ids) + + defer.returnValue(users_interested_in) + def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): """Checks the presence of users that have timed out and updates as @@ -1157,7 +1174,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): # If there are have been no sync for a while (and none ongoing), # set presence to offline if user_id not in syncing_user_ids: - if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: + # If the user has done something recently but hasn't synced, + # don't set them as offline. + sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) + if now - sync_or_active > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( state=PresenceState.OFFLINE, status_msg=None, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 87f74dfb8e..9bf638f818 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler): args={ "user_id": target_user.to_string(), "field": "displayname", - } + }, + ignore_backoff=True, ) except CodeMessageException as e: if e.code != 404: @@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler): args={ "user_id": target_user.to_string(), "field": "avatar_url", - } + }, + ignore_backoff=True, ) except CodeMessageException as e: if e.code != 404: @@ -156,11 +158,11 @@ class ProfileHandler(BaseHandler): self.ratelimit(requester) - joins = yield self.store.get_rooms_for_user( + room_ids = yield self.store.get_rooms_for_user( user.to_string(), ) - for j in joins: + for room_id in room_ids: handler = self.hs.get_handlers().room_member_handler try: # Assume the user isn't a guest because we don't let guests set @@ -171,12 +173,12 @@ class ProfileHandler(BaseHandler): yield handler.update_membership( requester, user, - j.room_id, + room_id, "join", # We treat a profile update like a join. ratelimit=False, # Try to hide that these events aren't atomic. ) except Exception as e: logger.warn( "Failed to update join event for room %s - %s", - j.room_id, str(e.message) + room_id, str(e.message) ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 50aa513935..e1cd3a48e9 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -210,10 +210,9 @@ class ReceiptEventSource(object): else: from_key = None - rooms = yield self.store.get_rooms_for_user(user.to_string()) - rooms = [room.room_id for room in rooms] + room_ids = yield self.store.get_rooms_for_user(user.to_string()) events = yield self.store.get_linearized_receipts_for_rooms( - rooms, + room_ids, from_key=from_key, to_key=to_key, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7e7671c9a2..99cb7db0db 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -356,7 +356,7 @@ class RoomCreationHandler(BaseHandler): class RoomContextHandler(BaseHandler): @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit, is_guest): + def get_event_context(self, user, room_id, event_id, limit): """Retrieves events, pagination tokens and state around a given event in a room. @@ -375,12 +375,15 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() + users = yield self.store.get_users_in_room(room_id) + is_peeking = user.to_string() not in users + def filter_evts(events): return filter_events_for_client( self.store, user.to_string(), events, - is_peeking=is_guest + is_peeking=is_peeking ) event = yield self.store.get_event(event_id, get_prev_content=True, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 19eebbd43f..516cd9a6ac 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -21,6 +21,7 @@ from synapse.api.constants import ( EventTypes, JoinRules, ) from synapse.util.async import concurrently_execute +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache from synapse.types import ThirdPartyInstanceID @@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler): appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. """ + logger.info( + "Getting public room list: limit=%r, since=%r, search=%r, network=%r", + limit, since_token, bool(search_filter), network_tuple, + ) if search_filter: # We explicitly don't bother caching searches or requests for # appservice specific lists. @@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler): rooms_to_order_value = {} rooms_to_num_joined = {} - rooms_to_latest_event_ids = {} newly_visible = [] newly_unpublished = [] @@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): - latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) - if not latest_event_ids: + # Most of the rooms won't have changed between the since token and + # now (especially if the since token is "now"). So, we can ask what + # the current users are in a room (that will hit a cache) and then + # check if the room has changed since the since token. (We have to + # do it in that order to avoid races). + # If things have changed then fall back to getting the current state + # at the since token. + joined_users = yield self.store.get_users_in_room(room_id) + if self.store.has_room_changed_since(room_id, stream_token): latest_event_ids = yield self.store.get_forward_extremeties_for_room( room_id, stream_token ) - rooms_to_latest_event_ids[room_id] = latest_event_ids - if not latest_event_ids: - return + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) - joined_users = yield self.state_handler.get_current_user_in_room( - room_id, latest_event_ids, - ) num_joined_users = len(joined_users) rooms_to_num_joined[room_id] = num_joined_users @@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # Actually generate the entries. _generate_room_entry will append to + # Actually generate the entries. _append_room_entry_to_chunk will append to # chunk but will stop if len(chunk) > limit chunk = [] if limit and not search_filter: step = limit + 1 for i in xrange(0, len(rooms_to_scan), step): # We iterate here because the vast majority of cases we'll stop - # at first iteration, but occaisonally _generate_room_entry + # at first iteration, but occaisonally _append_room_entry_to_chunk # won't append to the chunk and so we need to loop again. # We don't want to scan over the entire range either as that # would potentially waste a lot of work. yield concurrently_execute( - lambda r: self._generate_room_entry( + lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), @@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler): break else: yield concurrently_execute( - lambda r: self._generate_room_entry( + lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), @@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler): defer.returnValue(results) @defer.inlineCallbacks - def _generate_room_entry(self, room_id, num_joined_users, chunk, limit, - search_filter): + def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, + search_filter): + """Generate the entry for a room in the public room list and append it + to the `chunk` if it matches the search filter + """ if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return + result = yield self._generate_room_entry(room_id, num_joined_users) + + if result and _matches_room_entry(result, search_filter): + chunk.append(result) + + @cachedInlineCallbacks(num_args=1, cache_context=True) + def _generate_room_entry(self, room_id, num_joined_users, cache_context): + """Returns the entry for a room + """ result = { "room_id": room_id, "num_joined_members": num_joined_users, } - current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + current_state_ids = yield self.store.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate, + ) event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.items() + event_id for key, event_id in current_state_ids.iteritems() if key[0] in ( EventTypes.JoinRules, EventTypes.Name, @@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler): if join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - aliases = yield self.store.get_aliases_for_room(room_id) + aliases = yield self.store.get_aliases_for_room( + room_id, on_invalidate=cache_context.invalidate + ) if aliases: result["aliases"] = aliases @@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - if _matches_room_entry(result, search_filter): - chunk.append(result) + defer.returnValue(result) @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d7dcd1ce5b..c0205da1a9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import RoomStreamToken from twisted.internet import defer @@ -225,8 +226,7 @@ class SyncHandler(object): with Measure(self.clock, "ephemeral_by_room"): typing_key = since_token.typing_key if since_token else "0" - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] + room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string()) typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( @@ -568,16 +568,15 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(r.room_id for r in rooms) + room_ids = yield self.store.get_rooms_for_user(user_id) user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) for other_user_id in changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): + other_room_ids = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(other_room_ids): user_ids_changed.add(other_user_id) defer.returnValue(user_ids_changed) @@ -609,14 +608,14 @@ class SyncHandler(object): deleted = yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) - logger.info("Deleted %d to-device messages up to %d", - deleted, since_stream_id) + logger.debug("Deleted %d to-device messages up to %d", + deleted, since_stream_id) messages, stream_id = yield self.store.get_new_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) - logger.info( + logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", len(messages), since_stream_id, stream_id, now_token.to_device_key ) @@ -721,14 +720,14 @@ class SyncHandler(object): extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) - states = yield self.presence_handler.get_states( - extra_users_ids, - as_event=True, - ) - presence.extend(states) + if extra_users_ids: + states = yield self.presence_handler.get_states( + extra_users_ids, + ) + presence.extend(states) - # Deduplicate the presence entries so that there's at most one per user - presence = {p["content"]["user_id"]: p for p in presence}.values() + # Deduplicate the presence entries so that there's at most one per user + presence = {p.user_id: p for p in presence}.values() presence = sync_config.filter_collection.filter_presence( presence @@ -765,6 +764,21 @@ class SyncHandler(object): ) sync_result_builder.now_token = now_token + # We check up front if anything has changed, if it hasn't then there is + # no point in going futher. + since_token = sync_result_builder.since_token + if not sync_result_builder.full_state: + if since_token and not ephemeral_by_room and not account_data_by_room: + have_changed = yield self._have_rooms_changed(sync_result_builder) + if not have_changed: + tags_by_room = yield self.store.get_updated_tags( + user_id, + since_token.account_data_key, + ) + if not tags_by_room: + logger.debug("no-oping sync") + defer.returnValue(([], [])) + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, ) @@ -774,13 +788,12 @@ class SyncHandler(object): else: ignored_users = frozenset() - if sync_result_builder.since_token: + if since_token: res = yield self._get_rooms_changed(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( - user_id, - sync_result_builder.since_token.account_data_key, + user_id, since_token.account_data_key, ) else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) @@ -805,7 +818,7 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() - if sync_result_builder.since_token: + if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( joined_sync.timeline.events, joined_sync.state.values() @@ -818,6 +831,38 @@ class SyncHandler(object): defer.returnValue((newly_joined_rooms, newly_joined_users)) @defer.inlineCallbacks + def _have_rooms_changed(self, sync_result_builder): + """Returns whether there may be any new events that should be sent down + the sync. Returns True if there are. + """ + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + + assert since_token + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + if rooms_changed: + defer.returnValue(True) + + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + joined_room_ids = yield self.store.get_rooms_for_user(user_id) + + stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream + for room_id in joined_room_ids: + if self.store.has_room_changed_since(room_id, stream_id): + defer.returnValue(True) + defer.returnValue(False) + + @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. @@ -841,8 +886,7 @@ class SyncHandler(object): rooms = yield self.store.get_app_service_rooms(app_service) joined_room_ids = set(r.room_id for r in rooms) else: - rooms = yield self.store.get_rooms_for_user(user_id) - joined_room_ids = set(r.room_id for r in rooms) + joined_room_ids = yield self.store.get_rooms_for_user(user_id) # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( |