diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 1b007d4945..c22f65ce5d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -248,8 +248,7 @@ class DeviceHandler(BaseHandler):
user_id, device_ids, list(hosts)
)
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = [r.room_id for r in rooms]
+ room_ids = yield self.store.get_rooms_for_user(user_id)
yield self.notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
@@ -270,8 +269,7 @@ class DeviceHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = set(r.room_id for r in rooms)
+ room_ids = yield self.store.get_rooms_for_user(user_id)
# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
@@ -347,8 +345,8 @@ class DeviceHandler(BaseHandler):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
- rooms = yield self.store.get_rooms_for_user(user_id)
- if not rooms:
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
# We no longer share rooms with this user, so we'll no longer
# receive device updates. Mark this in DB.
yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
@@ -404,8 +402,8 @@ class DeviceListEduUpdater(object):
logger.warning("Got device list update edu for %r from %r", user_id, origin)
return
- rooms = yield self.store.get_rooms_for_user(user_id)
- if not rooms:
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 1b5317edf5..943554ce98 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler):
"room_alias": room_alias.to_string(),
},
retry_on_dns_fail=False,
+ ignore_backoff=True,
)
except CodeMessageException as e:
logging.warn("Error retrieving alias")
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index e40495d1ab..c2b38d72a9 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, CodeMessageException
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -121,15 +121,11 @@ class E2eKeysHandler(object):
def do_remote_query(destination):
destination_query = remote_queries_not_in_cache[destination]
try:
- limiter = yield get_retry_limiter(
- destination, self.clock, self.store
+ remote_result = yield self.federation.query_client_keys(
+ destination,
+ {"device_keys": destination_query},
+ timeout=timeout
)
- with limiter:
- remote_result = yield self.federation.query_client_keys(
- destination,
- {"device_keys": destination_query},
- timeout=timeout
- )
for user_id, keys in remote_result["device_keys"].items():
if user_id in destination_query:
@@ -239,18 +235,14 @@ class E2eKeysHandler(object):
def claim_client_keys(destination):
device_keys = remote_queries[destination]
try:
- limiter = yield get_retry_limiter(
- destination, self.clock, self.store
+ remote_result = yield self.federation.claim_client_keys(
+ destination,
+ {"one_time_keys": device_keys},
+ timeout=timeout
)
- with limiter:
- remote_result = yield self.federation.claim_client_keys(
- destination,
- {"one_time_keys": device_keys},
- timeout=timeout
- )
- for user_id, keys in remote_result["one_time_keys"].items():
- if user_id in device_keys:
- json_result[user_id] = keys
+ for user_id, keys in remote_result["one_time_keys"].items():
+ if user_id in device_keys:
+ json_result[user_id] = keys
except CodeMessageException as e:
failures[destination] = {
"status": e.code, "message": e.message
@@ -316,7 +308,7 @@ class E2eKeysHandler(object):
# old access_token without an associated device_id. Either way, we
# need to double-check the device is registered to avoid ending up with
# keys without a corresponding device.
- self.device_handler.check_device_registered(user_id, device_id)
+ yield self.device_handler.check_device_registered(user_id, device_id)
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0c2b4d6ed..888dd01240 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,7 @@
# limitations under the License.
"""Contains handlers for federation events."""
+import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -114,6 +115,14 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
+ # If we are currently in the process of joining this room, then we
+ # queue up events for later processing.
+ if pdu.room_id in self.room_queues:
+ logger.info("Ignoring PDU %s for room %s from %s for now; join "
+ "in progress", pdu.event_id, pdu.room_id, origin)
+ self.room_queues[pdu.room_id].append((pdu, origin))
+ return
+
state = None
auth_chain = []
@@ -274,26 +283,13 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
+ def _process_received_pdu(self, origin, pdu, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
-
- auth_chain and state are None if we already have the necessary state
- and prev_events in the db
"""
event = pdu
- logger.debug("Got event: %s", event.event_id)
-
- # If we are currently in the process of joining this room, then we
- # queue up events for later processing.
- if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append((pdu, origin))
- return
-
- logger.debug("Processing event: %s", event.event_id)
-
- logger.debug("Event: %s", event)
+ logger.debug("Processing event: %s", event)
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -862,8 +858,6 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
- yield self.store.clean_room_for_join(room_id)
-
origin, event = yield self._make_and_verify_event(
target_hosts,
room_id,
@@ -872,7 +866,15 @@ class FederationHandler(BaseHandler):
content,
)
+ # This shouldn't happen, because the RoomMemberHandler has a
+ # linearizer lock which only allows one operation per user per room
+ # at a time - so this is just paranoia.
+ assert (room_id not in self.room_queues)
+
self.room_queues[room_id] = []
+
+ yield self.store.clean_room_for_join(room_id)
+
handled_events = set()
try:
@@ -925,18 +927,37 @@ class FederationHandler(BaseHandler):
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p, origin in room_queue:
- if p.event_id in handled_events:
- continue
+ # we don't need to wait for the queued events to be processed -
+ # it's just a best-effort thing at this point. We do want to do
+ # them roughly in order, though, otherwise we'll end up making
+ # lots of requests for missing prev_events which we do actually
+ # have. Hence we fire off the deferred, but don't wait for it.
- try:
- self._process_received_pdu(origin, p)
- except:
- logger.exception("Couldn't handle pdu")
+ synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+ room_queue
+ )
defer.returnValue(True)
@defer.inlineCallbacks
+ def _handle_queued_pdus(self, room_queue):
+ """Process PDUs which got queued up while we were busy send_joining.
+
+ Args:
+ room_queue (list[FrozenEvent, str]): list of PDUs to be processed
+ and the servers that sent them
+ """
+ for p, origin in room_queue:
+ try:
+ logger.info("Processing queued PDU %s which was received "
+ "while we were joining %s", p.event_id, p.room_id)
+ yield self.on_receive_pdu(origin, p)
+ except Exception as e:
+ logger.warn(
+ "Error handling queued PDU %s from %s: %s",
+ p.event_id, origin, e)
+
+ @defer.inlineCallbacks
@log_function
def on_make_join_request(self, room_id, user_id):
""" We've received a /make_join/ request, so we create a partial
@@ -1517,7 +1538,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
+ """
+ Args:
+ origin:
+ event:
+ state:
+ auth_events:
+
+ Returns:
+ Deferred, which resolves to synapse.events.snapshot.EventContext
+ """
context = yield self.state_handler.compute_event_context(
event, old_state=state,
)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e0ade4c164..10f5f35a69 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
+from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import (
UserID, StreamToken,
@@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler):
"content": content,
})
+ now = self.clock.time_msec()
+
ret = {
"rooms": rooms_ret,
- "presence": presence,
+ "presence": [
+ {
+ "type": "m.presence",
+ "content": format_user_presence_state(event, now),
+ }
+ for event in presence
+ ],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..1ede117c79 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
+from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -556,9 +557,9 @@ class PresenceHandler(object):
room_ids_to_states = {}
users_to_states = {}
for state in states:
- events = yield self.store.get_rooms_for_user(state.user_id)
- for e in events:
- room_ids_to_states.setdefault(e.room_id, []).append(state)
+ room_ids = yield self.store.get_rooms_for_user(state.user_id)
+ for room_id in room_ids:
+ room_ids_to_states.setdefault(room_id, []).append(state)
plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
@@ -574,8 +575,7 @@ class PresenceHandler(object):
if not local_states:
continue
- users = yield self.store.get_users_in_room(room_id)
- hosts = set(get_domain_from_id(u) for u in users)
+ hosts = yield self.store.get_hosts_in_room(room_id)
for host in hosts:
hosts_to_states.setdefault(host, []).extend(local_states)
@@ -719,9 +719,7 @@ class PresenceHandler(object):
for state in updates
])
else:
- defer.returnValue([
- format_user_presence_state(state, now) for state in updates
- ])
+ defer.returnValue(updates)
@defer.inlineCallbacks
def set_state(self, target_user, state, ignore_status_msg=False):
@@ -795,6 +793,9 @@ class PresenceHandler(object):
as_event=False,
)
+ now = self.clock.time_msec()
+ results[:] = [format_user_presence_state(r, now) for r in results]
+
is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list
}
@@ -847,6 +848,7 @@ class PresenceHandler(object):
)
state_dict = yield self.get_state(observed_user, as_event=False)
+ state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
self.federation.send_edu(
destination=observer_user.domain,
@@ -910,11 +912,12 @@ class PresenceHandler(object):
def is_visible(self, observed_user, observer_user):
"""Returns whether a user can see another user's presence.
"""
- observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
- observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
- observer_room_ids = set(r.room_id for r in observer_rooms)
- observed_room_ids = set(r.room_id for r in observed_rooms)
+ observer_room_ids = yield self.store.get_rooms_for_user(
+ observer_user.to_string()
+ )
+ observed_room_ids = yield self.store.get_rooms_for_user(
+ observed_user.to_string()
+ )
if observer_room_ids & observed_room_ids:
defer.returnValue(True)
@@ -979,14 +982,18 @@ def should_notify(old_state, new_state):
return False
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
+
+ The "user_id" is optional so that this function can be used to format presence
+ updates for client /sync responses and for federation /send requests.
"""
content = {
"presence": state.state,
- "user_id": state.user_id,
}
+ if include_user_id:
+ content["user_id"] = state.user_id
if state.last_active_ts:
content["last_active_ago"] = now - state.last_active_ts
if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1032,6 @@ class PresenceEventSource(object):
# sending down the rare duplicate is not a concern.
with Measure(self.clock, "presence.get_new_events"):
- user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)
@@ -1034,18 +1040,7 @@ class PresenceEventSource(object):
max_token = self.store.get_current_presence_token()
- plist = yield self.store.get_presence_list_accepted(user.localpart)
- users_interested_in = set(row["observed_user_id"] for row in plist)
- users_interested_in.add(user_id) # So that we receive our own presence
-
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
- users_interested_in.update(users_who_share_room)
-
- if explicit_room_id:
- user_ids = yield self.store.get_users_in_room(explicit_room_id)
- users_interested_in.update(user_ids)
+ users_interested_in = yield self._get_interested_in(user, explicit_room_id)
user_ids_changed = set()
changed = None
@@ -1073,16 +1068,13 @@ class PresenceEventSource(object):
updates = yield presence.current_state_for_users(user_ids_changed)
- now = self.clock.time_msec()
-
- defer.returnValue(([
- {
- "type": "m.presence",
- "content": format_user_presence_state(s, now),
- }
- for s in updates.values()
- if include_offline or s.state != PresenceState.OFFLINE
- ], max_token))
+ if include_offline:
+ defer.returnValue((updates.values(), max_token))
+ else:
+ defer.returnValue(([
+ s for s in updates.itervalues()
+ if s.state != PresenceState.OFFLINE
+ ], max_token))
def get_current_key(self):
return self.store.get_current_presence_token()
@@ -1090,6 +1082,31 @@ class PresenceEventSource(object):
def get_pagination_rows(self, user, pagination_config, key):
return self.get_new_events(user, from_key=None, include_offline=False)
+ @cachedInlineCallbacks(num_args=2, cache_context=True)
+ def _get_interested_in(self, user, explicit_room_id, cache_context):
+ """Returns the set of users that the given user should see presence
+ updates for
+ """
+ user_id = user.to_string()
+ plist = yield self.store.get_presence_list_accepted(
+ user.localpart, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in = set(row["observed_user_id"] for row in plist)
+ users_interested_in.add(user_id) # So that we receive our own presence
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in.update(users_who_share_room)
+
+ if explicit_room_id:
+ user_ids = yield self.store.get_users_in_room(
+ explicit_room_id, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in.update(user_ids)
+
+ defer.returnValue(users_interested_in)
+
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
@@ -1157,7 +1174,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
if user_id not in syncing_user_ids:
- if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+ # If the user has done something recently but hasn't synced,
+ # don't set them as offline.
+ sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+ if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
status_msg=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 87f74dfb8e..9bf638f818 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler):
args={
"user_id": target_user.to_string(),
"field": "displayname",
- }
+ },
+ ignore_backoff=True,
)
except CodeMessageException as e:
if e.code != 404:
@@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler):
args={
"user_id": target_user.to_string(),
"field": "avatar_url",
- }
+ },
+ ignore_backoff=True,
)
except CodeMessageException as e:
if e.code != 404:
@@ -156,11 +158,11 @@ class ProfileHandler(BaseHandler):
self.ratelimit(requester)
- joins = yield self.store.get_rooms_for_user(
+ room_ids = yield self.store.get_rooms_for_user(
user.to_string(),
)
- for j in joins:
+ for room_id in room_ids:
handler = self.hs.get_handlers().room_member_handler
try:
# Assume the user isn't a guest because we don't let guests set
@@ -171,12 +173,12 @@ class ProfileHandler(BaseHandler):
yield handler.update_membership(
requester,
user,
- j.room_id,
+ room_id,
"join", # We treat a profile update like a join.
ratelimit=False, # Try to hide that these events aren't atomic.
)
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
- j.room_id, str(e.message)
+ room_id, str(e.message)
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 50aa513935..e1cd3a48e9 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -210,10 +210,9 @@ class ReceiptEventSource(object):
else:
from_key = None
- rooms = yield self.store.get_rooms_for_user(user.to_string())
- rooms = [room.room_id for room in rooms]
+ room_ids = yield self.store.get_rooms_for_user(user.to_string())
events = yield self.store.get_linearized_receipts_for_rooms(
- rooms,
+ room_ids,
from_key=from_key,
to_key=to_key,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5572cb883f..c0205da1a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
+from synapse.types import RoomStreamToken
from twisted.internet import defer
@@ -225,8 +226,7 @@ class SyncHandler(object):
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
+ room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
@@ -568,16 +568,15 @@ class SyncHandler(object):
since_token = sync_result_builder.since_token
if since_token and since_token.device_list_key:
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = set(r.room_id for r in rooms)
+ room_ids = yield self.store.get_rooms_for_user(user_id)
user_ids_changed = set()
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
)
for other_user_id in changed:
- other_rooms = yield self.store.get_rooms_for_user(other_user_id)
- if room_ids.intersection(e.room_id for e in other_rooms):
+ other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
+ if room_ids.intersection(other_room_ids):
user_ids_changed.add(other_user_id)
defer.returnValue(user_ids_changed)
@@ -721,14 +720,14 @@ class SyncHandler(object):
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
- states = yield self.presence_handler.get_states(
- extra_users_ids,
- as_event=True,
- )
- presence.extend(states)
+ if extra_users_ids:
+ states = yield self.presence_handler.get_states(
+ extra_users_ids,
+ )
+ presence.extend(states)
- # Deduplicate the presence entries so that there's at most one per user
- presence = {p["content"]["user_id"]: p for p in presence}.values()
+ # Deduplicate the presence entries so that there's at most one per user
+ presence = {p.user_id: p for p in presence}.values()
presence = sync_config.filter_collection.filter_presence(
presence
@@ -765,6 +764,21 @@ class SyncHandler(object):
)
sync_result_builder.now_token = now_token
+ # We check up front if anything has changed, if it hasn't then there is
+ # no point in going futher.
+ since_token = sync_result_builder.since_token
+ if not sync_result_builder.full_state:
+ if since_token and not ephemeral_by_room and not account_data_by_room:
+ have_changed = yield self._have_rooms_changed(sync_result_builder)
+ if not have_changed:
+ tags_by_room = yield self.store.get_updated_tags(
+ user_id,
+ since_token.account_data_key,
+ )
+ if not tags_by_room:
+ logger.debug("no-oping sync")
+ defer.returnValue(([], []))
+
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id,
)
@@ -774,13 +788,12 @@ class SyncHandler(object):
else:
ignored_users = frozenset()
- if sync_result_builder.since_token:
+ if since_token:
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
tags_by_room = yield self.store.get_updated_tags(
- user_id,
- sync_result_builder.since_token.account_data_key,
+ user_id, since_token.account_data_key,
)
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -805,7 +818,7 @@ class SyncHandler(object):
# Now we want to get any newly joined users
newly_joined_users = set()
- if sync_result_builder.since_token:
+ if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
joined_sync.timeline.events, joined_sync.state.values()
@@ -818,6 +831,38 @@ class SyncHandler(object):
defer.returnValue((newly_joined_rooms, newly_joined_users))
@defer.inlineCallbacks
+ def _have_rooms_changed(self, sync_result_builder):
+ """Returns whether there may be any new events that should be sent down
+ the sync. Returns True if there are.
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+
+ assert since_token
+
+ # Get a list of membership change events that have happened.
+ rooms_changed = yield self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key
+ )
+
+ if rooms_changed:
+ defer.returnValue(True)
+
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ rooms = yield self.store.get_app_service_rooms(app_service)
+ joined_room_ids = set(r.room_id for r in rooms)
+ else:
+ joined_room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+ for room_id in joined_room_ids:
+ if self.store.has_room_changed_since(room_id, stream_id):
+ defer.returnValue(True)
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync.
@@ -841,8 +886,7 @@ class SyncHandler(object):
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
- rooms = yield self.store.get_rooms_for_user(user_id)
- joined_room_ids = set(r.room_id for r in rooms)
+ joined_room_ids = yield self.store.get_rooms_for_user(user_id)
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
|