diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 90f96209f8..e83adc8339 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -88,9 +88,13 @@ class BaseHandler(object):
current_state = yield self.store.get_events(
context.current_state_ids.values()
)
- current_state = current_state.values()
else:
- current_state = yield self.store.get_current_state(event.room_id)
+ current_state = yield self.state_handler.get_current_state(
+ event.room_id
+ )
+
+ current_state = current_state.values()
+
logger.info("maybe_kick_guest_users %r", current_state)
yield self.kick_guest_users(current_state)
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 084e33ca6a..f36b358b45 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -19,7 +19,6 @@ from ._base import BaseHandler
import logging
-
logger = logging.getLogger(__name__)
@@ -54,3 +53,46 @@ class AdminHandler(BaseHandler):
}
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_users(self):
+ """Function to reterive a list of users in users table.
+
+ Args:
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ ret = yield self.store.get_users()
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_users_paginate(self, order, start, limit):
+ """Function to reterive a paginated list of users from
+ users list. This will return a json object, which contains
+ list of users and the total number of users in users table.
+
+ Args:
+ order (str): column name to order the select by this column
+ start (int): start number to begin the query from
+ limit (int): number of rows to reterive
+ Returns:
+ defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+ """
+ ret = yield self.store.get_users_paginate(order, start, limit)
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def search_users(self, term):
+ """Function to search users list for one or more users with
+ the matched term.
+
+ Args:
+ term (str): search term
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ ret = yield self.store.search_users(term)
+
+ defer.returnValue(ret)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 221d7ea7a2..e7a1bb7246 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -47,6 +48,7 @@ class AuthHandler(BaseHandler):
LoginType.PASSWORD: self._check_password_auth,
LoginType.RECAPTCHA: self._check_recaptcha,
LoginType.EMAIL_IDENTITY: self._check_email_identity,
+ LoginType.MSISDN: self._check_msisdn,
LoginType.DUMMY: self._check_dummy_auth,
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -65,6 +67,7 @@ class AuthHandler(BaseHandler):
self.hs = hs # FIXME better possibility to access registrationHandler later?
self.device_handler = hs.get_device_handler()
+ self.macaroon_gen = hs.get_macaroon_generator()
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
@@ -306,31 +309,47 @@ class AuthHandler(BaseHandler):
defer.returnValue(True)
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
- @defer.inlineCallbacks
def _check_email_identity(self, authdict, _):
+ return self._check_threepid('email', authdict)
+
+ def _check_msisdn(self, authdict, _):
+ return self._check_threepid('msisdn', authdict)
+
+ @defer.inlineCallbacks
+ def _check_dummy_auth(self, authdict, _):
+ yield run_on_reactor()
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _check_threepid(self, medium, authdict):
yield run_on_reactor()
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
threepid_creds = authdict['threepid_creds']
+
identity_handler = self.hs.get_handlers().identity_handler
- logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+ logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
threepid = yield identity_handler.threepid_from_creds(threepid_creds)
if not threepid:
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+ if threepid['medium'] != medium:
+ raise LoginError(
+ 401,
+ "Expecting threepid of type '%s', got '%s'" % (
+ medium, threepid['medium'],
+ ),
+ errcode=Codes.UNAUTHORIZED
+ )
+
threepid['threepid_creds'] = authdict['threepid_creds']
defer.returnValue(threepid)
- @defer.inlineCallbacks
- def _check_dummy_auth(self, authdict, _):
- yield run_on_reactor()
- defer.returnValue(True)
-
def _get_params_recaptcha(self):
return {"public_key": self.hs.config.recaptcha_public_key}
@@ -529,37 +548,11 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def issue_access_token(self, user_id, device_id=None):
- access_token = self.generate_access_token(user_id)
+ access_token = self.macaroon_gen.generate_access_token(user_id)
yield self.store.add_access_token_to_user(user_id, access_token,
device_id)
defer.returnValue(access_token)
- def generate_access_token(self, user_id, extra_caveats=None):
- extra_caveats = extra_caveats or []
- macaroon = self._generate_base_macaroon(user_id)
- macaroon.add_first_party_caveat("type = access")
- # Include a nonce, to make sure that each login gets a different
- # access token.
- macaroon.add_first_party_caveat("nonce = %s" % (
- stringutils.random_string_with_symbols(16),
- ))
- for caveat in extra_caveats:
- macaroon.add_first_party_caveat(caveat)
- return macaroon.serialize()
-
- def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
- macaroon = self._generate_base_macaroon(user_id)
- macaroon.add_first_party_caveat("type = login")
- now = self.hs.get_clock().time_msec()
- expiry = now + duration_in_ms
- macaroon.add_first_party_caveat("time < %d" % (expiry,))
- return macaroon.serialize()
-
- def generate_delete_pusher_token(self, user_id):
- macaroon = self._generate_base_macaroon(user_id)
- macaroon.add_first_party_caveat("type = delete_pusher")
- return macaroon.serialize()
-
def validate_short_term_login_token_and_get_user_id(self, login_token):
auth_api = self.hs.get_auth()
try:
@@ -570,15 +563,6 @@ class AuthHandler(BaseHandler):
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
- def _generate_base_macaroon(self, user_id):
- macaroon = pymacaroons.Macaroon(
- location=self.hs.config.server_name,
- identifier="key",
- key=self.hs.config.macaroon_secret_key)
- macaroon.add_first_party_caveat("gen = 1")
- macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
- return macaroon
-
@defer.inlineCallbacks
def set_password(self, user_id, newpassword, requester=None):
password_hash = self.hash(newpassword)
@@ -673,6 +657,48 @@ class AuthHandler(BaseHandler):
return False
+class MacaroonGeneartor(object):
+ def __init__(self, hs):
+ self.clock = hs.get_clock()
+ self.server_name = hs.config.server_name
+ self.macaroon_secret_key = hs.config.macaroon_secret_key
+
+ def generate_access_token(self, user_id, extra_caveats=None):
+ extra_caveats = extra_caveats or []
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = access")
+ # Include a nonce, to make sure that each login gets a different
+ # access token.
+ macaroon.add_first_party_caveat("nonce = %s" % (
+ stringutils.random_string_with_symbols(16),
+ ))
+ for caveat in extra_caveats:
+ macaroon.add_first_party_caveat(caveat)
+ return macaroon.serialize()
+
+ def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = login")
+ now = self.clock.time_msec()
+ expiry = now + duration_in_ms
+ macaroon.add_first_party_caveat("time < %d" % (expiry,))
+ return macaroon.serialize()
+
+ def generate_delete_pusher_token(self, user_id):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = delete_pusher")
+ return macaroon.serialize()
+
+ def _generate_base_macaroon(self, user_id):
+ macaroon = pymacaroons.Macaroon(
+ location=self.server_name,
+ identifier="key",
+ key=self.macaroon_secret_key)
+ macaroon.add_first_party_caveat("gen = 1")
+ macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
+ return macaroon
+
+
class _AccountHandler(object):
"""A proxy object that gets passed to password auth providers so they
can register new users etc if necessary.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index aa68755936..c22f65ce5d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,9 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from synapse.api import errors
+from synapse.api.constants import EventTypes
from synapse.util import stringutils
+from synapse.util.async import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id, RoomStreamToken
from twisted.internet import defer
from ._base import BaseHandler
@@ -27,6 +31,22 @@ class DeviceHandler(BaseHandler):
def __init__(self, hs):
super(DeviceHandler, self).__init__(hs)
+ self.hs = hs
+ self.state = hs.get_state_handler()
+ self.federation_sender = hs.get_federation_sender()
+ self.federation = hs.get_replication_layer()
+
+ self._edu_updater = DeviceListEduUpdater(hs, self)
+
+ self.federation.register_edu_handler(
+ "m.device_list_update", self._edu_updater.incoming_device_list_update,
+ )
+ self.federation.register_query_handler(
+ "user_devices", self.on_federation_query_user_devices,
+ )
+
+ hs.get_distributor().observe("user_left_room", self.user_left_room)
+
@defer.inlineCallbacks
def check_device_registered(self, user_id, device_id,
initial_device_display_name=None):
@@ -45,29 +65,29 @@ class DeviceHandler(BaseHandler):
str: device id (generated if none was supplied)
"""
if device_id is not None:
- yield self.store.store_device(
+ new_device = yield self.store.store_device(
user_id=user_id,
device_id=device_id,
initial_device_display_name=initial_device_display_name,
- ignore_if_known=True,
)
+ if new_device:
+ yield self.notify_device_update(user_id, [device_id])
defer.returnValue(device_id)
# if the device id is not specified, we'll autogen one, but loop a few
# times in case of a clash.
attempts = 0
while attempts < 5:
- try:
- device_id = stringutils.random_string(10).upper()
- yield self.store.store_device(
- user_id=user_id,
- device_id=device_id,
- initial_device_display_name=initial_device_display_name,
- ignore_if_known=False,
- )
+ device_id = stringutils.random_string(10).upper()
+ new_device = yield self.store.store_device(
+ user_id=user_id,
+ device_id=device_id,
+ initial_device_display_name=initial_device_display_name,
+ )
+ if new_device:
+ yield self.notify_device_update(user_id, [device_id])
defer.returnValue(device_id)
- except errors.StoreError:
- attempts += 1
+ attempts += 1
raise errors.StoreError(500, "Couldn't generate a device ID.")
@@ -147,6 +167,42 @@ class DeviceHandler(BaseHandler):
user_id=user_id, device_id=device_id
)
+ yield self.notify_device_update(user_id, [device_id])
+
+ @defer.inlineCallbacks
+ def delete_devices(self, user_id, device_ids):
+ """ Delete several devices
+
+ Args:
+ user_id (str):
+ device_ids (str): The list of device IDs to delete
+
+ Returns:
+ defer.Deferred:
+ """
+
+ try:
+ yield self.store.delete_devices(user_id, device_ids)
+ except errors.StoreError, e:
+ if e.code == 404:
+ # no match
+ pass
+ else:
+ raise
+
+ # Delete access tokens and e2e keys for each device. Not optimised as it is not
+ # considered as part of a critical path.
+ for device_id in device_ids:
+ yield self.store.user_delete_access_tokens(
+ user_id, device_id=device_id,
+ delete_refresh_tokens=True,
+ )
+ yield self.store.delete_e2e_keys_by_device(
+ user_id=user_id, device_id=device_id
+ )
+
+ yield self.notify_device_update(user_id, device_ids)
+
@defer.inlineCallbacks
def update_device(self, user_id, device_id, content):
""" Update the given device
@@ -166,12 +222,135 @@ class DeviceHandler(BaseHandler):
device_id,
new_display_name=content.get("display_name")
)
+ yield self.notify_device_update(user_id, [device_id])
except errors.StoreError, e:
if e.code == 404:
raise errors.NotFoundError()
else:
raise
+ @measure_func("notify_device_update")
+ @defer.inlineCallbacks
+ def notify_device_update(self, user_id, device_ids):
+ """Notify that a user's device(s) has changed. Pokes the notifier, and
+ remote servers if the user is local.
+ """
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+
+ hosts = set()
+ if self.hs.is_mine_id(user_id):
+ hosts.update(get_domain_from_id(u) for u in users_who_share_room)
+ hosts.discard(self.server_name)
+
+ position = yield self.store.add_device_change_to_streams(
+ user_id, device_ids, list(hosts)
+ )
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ yield self.notifier.on_new_event(
+ "device_list_key", position, rooms=room_ids,
+ )
+
+ if hosts:
+ logger.info("Sending device list update notif to: %r", hosts)
+ for host in hosts:
+ self.federation_sender.send_device_messages(host)
+
+ @measure_func("device.get_user_ids_changed")
+ @defer.inlineCallbacks
+ def get_user_ids_changed(self, user_id, from_token):
+ """Get list of users that have had the devices updated, or have newly
+ joined a room, that `user_id` may be interested in.
+
+ Args:
+ user_id (str)
+ from_token (StreamToken)
+ """
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ # First we check if any devices have changed
+ changed = yield self.store.get_user_whose_devices_changed(
+ from_token.device_list_key
+ )
+
+ # Then work out if any users have since joined
+ rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+
+ stream_ordering = RoomStreamToken.parse_stream_token(
+ from_token.room_key).stream
+
+ possibly_changed = set(changed)
+ for room_id in rooms_changed:
+ # Fetch the current state at the time.
+ try:
+ event_ids = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_ordering=stream_ordering
+ )
+ except errors.StoreError:
+ # we have purged the stream_ordering index since the stream
+ # ordering: treat it the same as a new room
+ event_ids = []
+
+ current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+ # special-case for an empty prev state: include all members
+ # in the changed list
+ if not event_ids:
+ for key, event_id in current_state_ids.iteritems():
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ continue
+
+ # mapping from event_id -> state_dict
+ prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
+ # If there has been any change in membership, include them in the
+ # possibly changed list. We'll check if they are joined below,
+ # and we're not toooo worried about spuriously adding users.
+ for key, event_id in current_state_ids.iteritems():
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+
+ # check if this member has changed since any of the extremities
+ # at the stream_ordering, and add them to the list if so.
+ for state_dict in prev_state_ids.values():
+ prev_event_id = state_dict.get(key, None)
+ if not prev_event_id or prev_event_id != event_id:
+ possibly_changed.add(state_key)
+ break
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+
+ # Take the intersection of the users whose devices may have changed
+ # and those that actually still share a room with the user
+ defer.returnValue(users_who_share_room & possibly_changed)
+
+ @defer.inlineCallbacks
+ def on_federation_query_user_devices(self, user_id):
+ stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+ defer.returnValue({
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ })
+
+ @defer.inlineCallbacks
+ def user_left_room(self, user, room_id):
+ user_id = user.to_string()
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
+ # We no longer share rooms with this user, so we'll no longer
+ # receive device updates. Mark this in DB.
+ yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+
def _update_device_from_client_ips(device, client_ips):
ip = client_ips.get((device["user_id"], device["device_id"]), {})
@@ -179,3 +358,129 @@ def _update_device_from_client_ips(device, client_ips):
"last_seen_ts": ip.get("last_seen"),
"last_seen_ip": ip.get("ip"),
})
+
+
+class DeviceListEduUpdater(object):
+ "Handles incoming device list updates from federation and updates the DB"
+
+ def __init__(self, hs, device_handler):
+ self.store = hs.get_datastore()
+ self.federation = hs.get_replication_layer()
+ self.clock = hs.get_clock()
+ self.device_handler = device_handler
+
+ self._remote_edu_linearizer = Linearizer(name="remote_device_list")
+
+ # user_id -> list of updates waiting to be handled.
+ self._pending_updates = {}
+
+ # Recently seen stream ids. We don't bother keeping these in the DB,
+ # but they're useful to have them about to reduce the number of spurious
+ # resyncs.
+ self._seen_updates = ExpiringCache(
+ cache_name="device_update_edu",
+ clock=self.clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ iterable=True,
+ )
+
+ @defer.inlineCallbacks
+ def incoming_device_list_update(self, origin, edu_content):
+ """Called on incoming device list update from federation. Responsible
+ for parsing the EDU and adding to pending updates list.
+ """
+
+ user_id = edu_content.pop("user_id")
+ device_id = edu_content.pop("device_id")
+ stream_id = str(edu_content.pop("stream_id")) # They may come as ints
+ prev_ids = edu_content.pop("prev_id", [])
+ prev_ids = [str(p) for p in prev_ids] # They may come as ints
+
+ if get_domain_from_id(user_id) != origin:
+ # TODO: Raise?
+ logger.warning("Got device list update edu for %r from %r", user_id, origin)
+ return
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
+ # We don't share any rooms with this user. Ignore update, as we
+ # probably won't get any further updates.
+ return
+
+ self._pending_updates.setdefault(user_id, []).append(
+ (device_id, stream_id, prev_ids, edu_content)
+ )
+
+ yield self._handle_device_updates(user_id)
+
+ @measure_func("_incoming_device_list_update")
+ @defer.inlineCallbacks
+ def _handle_device_updates(self, user_id):
+ "Actually handle pending updates."
+
+ with (yield self._remote_edu_linearizer.queue(user_id)):
+ pending_updates = self._pending_updates.pop(user_id, [])
+ if not pending_updates:
+ # This can happen since we batch updates
+ return
+
+ resync = yield self._need_to_do_resync(user_id, pending_updates)
+
+ if resync:
+ # Fetch all devices for the user.
+ origin = get_domain_from_id(user_id)
+ result = yield self.federation.query_user_devices(origin, user_id)
+ stream_id = result["stream_id"]
+ devices = result["devices"]
+ yield self.store.update_remote_device_list_cache(
+ user_id, devices, stream_id,
+ )
+ device_ids = [device["device_id"] for device in devices]
+ yield self.device_handler.notify_device_update(user_id, device_ids)
+ else:
+ # Simply update the single device, since we know that is the only
+ # change (becuase of the single prev_id matching the current cache)
+ for device_id, stream_id, prev_ids, content in pending_updates:
+ yield self.store.update_remote_device_list_cache_entry(
+ user_id, device_id, content, stream_id,
+ )
+
+ yield self.device_handler.notify_device_update(
+ user_id, [device_id for device_id, _, _, _ in pending_updates]
+ )
+
+ self._seen_updates.setdefault(user_id, set()).update(
+ stream_id for _, stream_id, _, _ in pending_updates
+ )
+
+ @defer.inlineCallbacks
+ def _need_to_do_resync(self, user_id, updates):
+ """Given a list of updates for a user figure out if we need to do a full
+ resync, or whether we have enough data that we can just apply the delta.
+ """
+ seen_updates = self._seen_updates.get(user_id, set())
+
+ extremity = yield self.store.get_device_list_last_stream_id_for_remote(
+ user_id
+ )
+
+ stream_id_in_updates = set() # stream_ids in updates list
+ for _, stream_id, prev_ids, _ in updates:
+ if not prev_ids:
+ # We always do a resync if there are no previous IDs
+ defer.returnValue(True)
+
+ for prev_id in prev_ids:
+ if prev_id == extremity:
+ continue
+ elif prev_id in seen_updates:
+ continue
+ elif prev_id in stream_id_in_updates:
+ continue
+ else:
+ defer.returnValue(True)
+
+ stream_id_in_updates.add(stream_id)
+
+ defer.returnValue(False)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 1b5317edf5..943554ce98 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler):
"room_alias": room_alias.to_string(),
},
retry_on_dns_fail=False,
+ ignore_backoff=True,
)
except CodeMessageException as e:
logging.warn("Error retrieving alias")
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index b63a660c06..c2b38d72a9 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, CodeMessageException
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -73,10 +73,9 @@ class E2eKeysHandler(object):
if self.is_mine_id(user_id):
local_query[user_id] = device_ids
else:
- domain = get_domain_from_id(user_id)
- remote_queries.setdefault(domain, {})[user_id] = device_ids
+ remote_queries[user_id] = device_ids
- # do the queries
+ # Firt get local devices.
failures = {}
results = {}
if local_query:
@@ -85,19 +84,48 @@ class E2eKeysHandler(object):
if user_id in local_query:
results[user_id] = keys
+ # Now attempt to get any remote devices from our local cache.
+ remote_queries_not_in_cache = {}
+ if remote_queries:
+ query_list = []
+ for user_id, device_ids in remote_queries.iteritems():
+ if device_ids:
+ query_list.extend((user_id, device_id) for device_id in device_ids)
+ else:
+ query_list.append((user_id, None))
+
+ user_ids_not_in_cache, remote_results = (
+ yield self.store.get_user_devices_from_cache(
+ query_list
+ )
+ )
+ for user_id, devices in remote_results.iteritems():
+ user_devices = results.setdefault(user_id, {})
+ for device_id, device in devices.iteritems():
+ keys = device.get("keys", None)
+ device_display_name = device.get("device_display_name", None)
+ if keys:
+ result = dict(keys)
+ unsigned = result.setdefault("unsigned", {})
+ if device_display_name:
+ unsigned["device_display_name"] = device_display_name
+ user_devices[device_id] = result
+
+ for user_id in user_ids_not_in_cache:
+ domain = get_domain_from_id(user_id)
+ r = remote_queries_not_in_cache.setdefault(domain, {})
+ r[user_id] = remote_queries[user_id]
+
+ # Now fetch any devices that we don't have in our cache
@defer.inlineCallbacks
def do_remote_query(destination):
- destination_query = remote_queries[destination]
+ destination_query = remote_queries_not_in_cache[destination]
try:
- limiter = yield get_retry_limiter(
- destination, self.clock, self.store
+ remote_result = yield self.federation.query_client_keys(
+ destination,
+ {"device_keys": destination_query},
+ timeout=timeout
)
- with limiter:
- remote_result = yield self.federation.query_client_keys(
- destination,
- {"device_keys": destination_query},
- timeout=timeout
- )
for user_id, keys in remote_result["device_keys"].items():
if user_id in destination_query:
@@ -119,7 +147,7 @@ class E2eKeysHandler(object):
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
- for destination in remote_queries
+ for destination in remote_queries_not_in_cache
]))
defer.returnValue({
@@ -162,7 +190,7 @@ class E2eKeysHandler(object):
# "unsigned" section
for user_id, device_keys in results.items():
for device_id, device_info in device_keys.items():
- r = json.loads(device_info["key_json"])
+ r = dict(device_info["keys"])
r["unsigned"] = {}
display_name = device_info["device_display_name"]
if display_name is not None:
@@ -207,18 +235,14 @@ class E2eKeysHandler(object):
def claim_client_keys(destination):
device_keys = remote_queries[destination]
try:
- limiter = yield get_retry_limiter(
- destination, self.clock, self.store
+ remote_result = yield self.federation.claim_client_keys(
+ destination,
+ {"one_time_keys": device_keys},
+ timeout=timeout
)
- with limiter:
- remote_result = yield self.federation.claim_client_keys(
- destination,
- {"one_time_keys": device_keys},
- timeout=timeout
- )
- for user_id, keys in remote_result["one_time_keys"].items():
- if user_id in device_keys:
- json_result[user_id] = keys
+ for user_id, keys in remote_result["one_time_keys"].items():
+ if user_id in device_keys:
+ json_result[user_id] = keys
except CodeMessageException as e:
failures[destination] = {
"status": e.code, "message": e.message
@@ -255,10 +279,12 @@ class E2eKeysHandler(object):
device_id, user_id, time_now
)
# TODO: Sign the JSON with the server key
- yield self.store.set_e2e_device_keys(
- user_id, device_id, time_now,
- encode_canonical_json(device_keys)
+ changed = yield self.store.set_e2e_device_keys(
+ user_id, device_id, time_now, device_keys,
)
+ if changed:
+ # Only notify about device updates *if* the keys actually changed
+ yield self.device_handler.notify_device_update(user_id, [device_id])
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
@@ -282,7 +308,7 @@ class E2eKeysHandler(object):
# old access_token without an associated device_id. Either way, we
# need to double-check the device is registered to avoid ending up with
# keys without a corresponding device.
- self.device_handler.check_device_registered(user_id, device_id)
+ yield self.device_handler.check_device_registered(user_id, device_id)
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 996bfd0e23..888dd01240 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,7 @@
# limitations under the License.
"""Contains handlers for federation events."""
+import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -31,7 +32,7 @@ from synapse.util.logcontext import (
)
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
@@ -79,29 +80,216 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
+ self._room_pdu_linearizer = Linearizer("fed_room_pdu")
- @log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
- """ Called by the ReplicationLayer when we have a new pdu. We need to
- do auth checks and put it through the StateHandler.
+ @log_function
+ def on_receive_pdu(self, origin, pdu, get_missing=True):
+ """ Process a PDU received via a federation /send/ transaction, or
+ via backfill of missing prev_events
+
+ Args:
+ origin (str): server which initiated the /send/ transaction. Will
+ be used to fetch missing events or state.
+ pdu (FrozenEvent): received PDU
+ get_missing (bool): True if we should fetch missing prev_events
- auth_chain and state are None if we already have the necessary state
- and prev_events in the db
+ Returns (Deferred): completes with None
"""
- event = pdu
- logger.debug("Got event: %s", event.event_id)
+ # We reprocess pdus when we have seen them only as outliers
+ existing = yield self.get_persisted_pdu(
+ origin, pdu.event_id, do_auth=False
+ )
+
+ # FIXME: Currently we fetch an event again when we already have it
+ # if it has been marked as an outlier.
+
+ already_seen = (
+ existing and (
+ not existing.internal_metadata.is_outlier()
+ or pdu.internal_metadata.is_outlier()
+ )
+ )
+ if already_seen:
+ logger.debug("Already seen pdu %s", pdu.event_id)
+ return
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append((pdu, origin))
+ if pdu.room_id in self.room_queues:
+ logger.info("Ignoring PDU %s for room %s from %s for now; join "
+ "in progress", pdu.event_id, pdu.room_id, origin)
+ self.room_queues[pdu.room_id].append((pdu, origin))
return
- logger.debug("Processing event: %s", event.event_id)
+ state = None
+
+ auth_chain = []
+
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
+
+ fetch_state = False
+
+ # Get missing pdus if necessary.
+ if not pdu.internal_metadata.is_outlier():
+ # We only backfill backwards to the min depth.
+ min_depth = yield self.get_min_depth_for_context(
+ pdu.room_id
+ )
+
+ logger.debug(
+ "_handle_new_pdu min_depth for %s: %d",
+ pdu.room_id, min_depth
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+
+ if min_depth and pdu.depth < min_depth:
+ # This is so that we don't notify the user about this
+ # message, to work around the fact that some events will
+ # reference really really old events we really don't want to
+ # send to the clients.
+ pdu.internal_metadata.outlier = True
+ elif min_depth and pdu.depth > min_depth:
+ if get_missing and prevs - seen:
+ # If we're missing stuff, ensure we only fetch stuff one
+ # at a time.
+ logger.info(
+ "Acquiring lock for room %r to fetch %d missing events: %r...",
+ pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ )
+ with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+ logger.info(
+ "Acquired lock for room %r to fetch %d missing events",
+ pdu.room_id, len(prevs - seen),
+ )
+
+ yield self._get_missing_events_for_pdu(
+ origin, pdu, prevs, min_depth
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+ if prevs - seen:
+ logger.info(
+ "Still missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+ fetch_state = True
+
+ if fetch_state:
+ # We need to get the state at this event, since we haven't
+ # processed all the prev events.
+ logger.debug(
+ "_handle_new_pdu getting state for %s",
+ pdu.room_id
+ )
+ try:
+ state, auth_chain = yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, pdu.event_id,
+ )
+ except:
+ logger.exception("Failed to get state for event: %s", pdu.event_id)
+
+ yield self._process_received_pdu(
+ origin,
+ pdu,
+ state=state,
+ auth_chain=auth_chain,
+ )
+
+ @defer.inlineCallbacks
+ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+ """
+ Args:
+ origin (str): Origin of the pdu. Will be called to get the missing events
+ pdu: received pdu
+ prevs (str[]): List of event ids which we are missing
+ min_depth (int): Minimum depth of events to return.
+
+ Returns:
+ Deferred<dict(str, str?)>: updated have_seen dictionary
+ """
+ # We recalculate seen, since it may have changed.
+ have_seen = yield self.store.have_events(prevs)
+ seen = set(have_seen.keys())
- logger.debug("Event: %s", event)
+ if not prevs - seen:
+ # nothing left to do
+ defer.returnValue(have_seen)
+
+ latest = yield self.store.get_latest_event_ids_in_room(
+ pdu.room_id
+ )
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(latest)
+ latest |= seen
+
+ logger.info(
+ "Missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+
+ # XXX: we set timeout to 10s to help workaround
+ # https://github.com/matrix-org/synapse/issues/1733.
+ # The reason is to avoid holding the linearizer lock
+ # whilst processing inbound /send transactions, causing
+ # FDs to stack up and block other inbound transactions
+ # which empirically can currently take up to 30 minutes.
+ #
+ # N.B. this explicitly disables retry attempts.
+ #
+ # N.B. this also increases our chances of falling back to
+ # fetching fresh state for the room if the missing event
+ # can't be found, which slightly reduces our security.
+ # it may also increase our DAG extremity count for the room,
+ # causing additional state resolution? See #1760.
+ # However, fetching state doesn't hold the linearizer lock
+ # apparently.
+ #
+ # see https://github.com/matrix-org/synapse/pull/1744
+
+ missing_events = yield self.replication_layer.get_missing_events(
+ origin,
+ pdu.room_id,
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
+ limit=10,
+ min_depth=min_depth,
+ timeout=10000,
+ )
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ missing_events.sort(key=lambda x: x.depth)
+
+ for e in missing_events:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
+ defer.returnValue(have_seen)
+
+ @log_function
+ @defer.inlineCallbacks
+ def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ """ Called when we have a new pdu. We need to do auth checks and put it
+ through the StateHandler.
+ """
+ event = pdu
+
+ logger.debug("Processing event: %s", event)
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -670,8 +858,6 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
- yield self.store.clean_room_for_join(room_id)
-
origin, event = yield self._make_and_verify_event(
target_hosts,
room_id,
@@ -680,7 +866,15 @@ class FederationHandler(BaseHandler):
content,
)
+ # This shouldn't happen, because the RoomMemberHandler has a
+ # linearizer lock which only allows one operation per user per room
+ # at a time - so this is just paranoia.
+ assert (room_id not in self.room_queues)
+
self.room_queues[room_id] = []
+
+ yield self.store.clean_room_for_join(room_id)
+
handled_events = set()
try:
@@ -733,18 +927,37 @@ class FederationHandler(BaseHandler):
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p, origin in room_queue:
- if p.event_id in handled_events:
- continue
+ # we don't need to wait for the queued events to be processed -
+ # it's just a best-effort thing at this point. We do want to do
+ # them roughly in order, though, otherwise we'll end up making
+ # lots of requests for missing prev_events which we do actually
+ # have. Hence we fire off the deferred, but don't wait for it.
- try:
- self.on_receive_pdu(origin, p)
- except:
- logger.exception("Couldn't handle pdu")
+ synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+ room_queue
+ )
defer.returnValue(True)
@defer.inlineCallbacks
+ def _handle_queued_pdus(self, room_queue):
+ """Process PDUs which got queued up while we were busy send_joining.
+
+ Args:
+ room_queue (list[FrozenEvent, str]): list of PDUs to be processed
+ and the servers that sent them
+ """
+ for p, origin in room_queue:
+ try:
+ logger.info("Processing queued PDU %s which was received "
+ "while we were joining %s", p.event_id, p.room_id)
+ yield self.on_receive_pdu(origin, p)
+ except Exception as e:
+ logger.warn(
+ "Error handling queued PDU %s from %s: %s",
+ p.event_id, origin, e)
+
+ @defer.inlineCallbacks
@log_function
def on_make_join_request(self, room_id, user_id):
""" We've received a /make_join/ request, so we create a partial
@@ -1096,7 +1309,7 @@ class FederationHandler(BaseHandler):
if prev_id != event.event_id:
results[(event.type, event.state_key)] = prev_id
else:
- del results[(event.type, event.state_key)]
+ results.pop((event.type, event.state_key), None)
defer.returnValue(results.values())
else:
@@ -1325,7 +1538,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
+ """
+
+ Args:
+ origin:
+ event:
+ state:
+ auth_events:
+ Returns:
+ Deferred, which resolves to synapse.events.snapshot.EventContext
+ """
context = yield self.state_handler.compute_event_context(
event, old_state=state,
)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 559e5d5a71..6a53c5eb47 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler):
params.update(kwargs)
try:
- data = yield self.http_client.post_urlencoded_get_json(
+ data = yield self.http_client.post_json_get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/validate/email/requestToken"
@@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler):
except CodeMessageException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e
+
+ @defer.inlineCallbacks
+ def requestMsisdnToken(
+ self, id_server, country, phone_number,
+ client_secret, send_attempt, **kwargs
+ ):
+ yield run_on_reactor()
+
+ if not self._should_trust_id_server(id_server):
+ raise SynapseError(
+ 400, "Untrusted ID server '%s'" % id_server,
+ Codes.SERVER_NOT_TRUSTED
+ )
+
+ params = {
+ 'country': country,
+ 'phone_number': phone_number,
+ 'client_secret': client_secret,
+ 'send_attempt': send_attempt,
+ }
+ params.update(kwargs)
+
+ try:
+ data = yield self.http_client.post_json_get_json(
+ "https://%s%s" % (
+ id_server,
+ "/_matrix/identity/api/v1/validate/msisdn/requestToken"
+ ),
+ params
+ )
+ defer.returnValue(data)
+ except CodeMessageException as e:
+ logger.info("Proxied requestToken failed: %r", e)
+ raise e
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e0ade4c164..10f5f35a69 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
+from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import (
UserID, StreamToken,
@@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler):
"content": content,
})
+ now = self.clock.time_msec()
+
ret = {
"rooms": rooms_ret,
- "presence": presence,
+ "presence": [
+ {
+ "type": "m.presence",
+ "content": format_user_presence_state(event, now),
+ }
+ for event in presence
+ ],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 88bd2d572e..7a498af5a2 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -208,8 +208,10 @@ class MessageHandler(BaseHandler):
content = builder.content
try:
- content["displayname"] = yield profile.get_displayname(target)
- content["avatar_url"] = yield profile.get_avatar_url(target)
+ if "displayname" not in content:
+ content["displayname"] = yield profile.get_displayname(target)
+ if "avatar_url" not in content:
+ content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1b89dc6274..1ede117c79 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
+from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -531,7 +532,7 @@ class PresenceHandler(object):
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
- states.update({state.user_id: state for state in res})
+ states.update(res)
missing = [user_id for user_id, state in states.items() if not state]
if missing:
@@ -556,9 +557,9 @@ class PresenceHandler(object):
room_ids_to_states = {}
users_to_states = {}
for state in states:
- events = yield self.store.get_rooms_for_user(state.user_id)
- for e in events:
- room_ids_to_states.setdefault(e.room_id, []).append(state)
+ room_ids = yield self.store.get_rooms_for_user(state.user_id)
+ for room_id in room_ids:
+ room_ids_to_states.setdefault(room_id, []).append(state)
plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
@@ -574,8 +575,7 @@ class PresenceHandler(object):
if not local_states:
continue
- users = yield self.state.get_current_user_in_room(room_id)
- hosts = set(get_domain_from_id(u) for u in users)
+ hosts = yield self.store.get_hosts_in_room(room_id)
for host in hosts:
hosts_to_states.setdefault(host, []).extend(local_states)
@@ -719,9 +719,7 @@ class PresenceHandler(object):
for state in updates
])
else:
- defer.returnValue([
- format_user_presence_state(state, now) for state in updates
- ])
+ defer.returnValue(updates)
@defer.inlineCallbacks
def set_state(self, target_user, state, ignore_status_msg=False):
@@ -766,7 +764,7 @@ class PresenceHandler(object):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- user_ids = yield self.state.get_current_user_in_room(room_id)
+ user_ids = yield self.store.get_users_in_room(room_id)
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
@@ -795,6 +793,9 @@ class PresenceHandler(object):
as_event=False,
)
+ now = self.clock.time_msec()
+ results[:] = [format_user_presence_state(r, now) for r in results]
+
is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list
}
@@ -847,6 +848,7 @@ class PresenceHandler(object):
)
state_dict = yield self.get_state(observed_user, as_event=False)
+ state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
self.federation.send_edu(
destination=observer_user.domain,
@@ -910,11 +912,12 @@ class PresenceHandler(object):
def is_visible(self, observed_user, observer_user):
"""Returns whether a user can see another user's presence.
"""
- observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
- observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
- observer_room_ids = set(r.room_id for r in observer_rooms)
- observed_room_ids = set(r.room_id for r in observed_rooms)
+ observer_room_ids = yield self.store.get_rooms_for_user(
+ observer_user.to_string()
+ )
+ observed_room_ids = yield self.store.get_rooms_for_user(
+ observed_user.to_string()
+ )
if observer_room_ids & observed_room_ids:
defer.returnValue(True)
@@ -979,14 +982,18 @@ def should_notify(old_state, new_state):
return False
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
+
+ The "user_id" is optional so that this function can be used to format presence
+ updates for client /sync responses and for federation /send requests.
"""
content = {
"presence": state.state,
- "user_id": state.user_id,
}
+ if include_user_id:
+ content["user_id"] = state.user_id
if state.last_active_ts:
content["last_active_ago"] = now - state.last_active_ts
if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1011,7 +1018,7 @@ class PresenceEventSource(object):
@defer.inlineCallbacks
@log_function
def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
- **kwargs):
+ explicit_room_id=None, **kwargs):
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
@@ -1025,25 +1032,15 @@ class PresenceEventSource(object):
# sending down the rare duplicate is not a concern.
with Measure(self.clock, "presence.get_new_events"):
- user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)
- room_ids = room_ids or []
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
- if not room_ids:
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = set(e.room_id for e in rooms)
- else:
- room_ids = set(room_ids)
-
max_token = self.store.get_current_presence_token()
- plist = yield self.store.get_presence_list_accepted(user.localpart)
- friends = set(row["observed_user_id"] for row in plist)
- friends.add(user_id) # So that we receive our own presence
+ users_interested_in = yield self._get_interested_in(user, explicit_room_id)
user_ids_changed = set()
changed = None
@@ -1055,48 +1052,29 @@ class PresenceEventSource(object):
# work out if we share a room or they're in our presence list
get_updates_counter.inc("stream")
for other_user_id in changed:
- if other_user_id in friends:
+ if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
- continue
- other_rooms = yield self.store.get_rooms_for_user(other_user_id)
- if room_ids.intersection(e.room_id for e in other_rooms):
- user_ids_changed.add(other_user_id)
- continue
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.inc("full")
- user_ids_to_check = set()
- for room_id in room_ids:
- users = yield self.state.get_current_user_in_room(room_id)
- user_ids_to_check.update(users)
-
- user_ids_to_check.update(friends)
-
- # Always include yourself. Only really matters for when the user is
- # not in any rooms, but still.
- user_ids_to_check.add(user_id)
-
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
- user_ids_to_check, from_key,
+ users_interested_in, from_key,
)
else:
- user_ids_changed = user_ids_to_check
+ user_ids_changed = users_interested_in
updates = yield presence.current_state_for_users(user_ids_changed)
- now = self.clock.time_msec()
-
- defer.returnValue(([
- {
- "type": "m.presence",
- "content": format_user_presence_state(s, now),
- }
- for s in updates.values()
- if include_offline or s.state != PresenceState.OFFLINE
- ], max_token))
+ if include_offline:
+ defer.returnValue((updates.values(), max_token))
+ else:
+ defer.returnValue(([
+ s for s in updates.itervalues()
+ if s.state != PresenceState.OFFLINE
+ ], max_token))
def get_current_key(self):
return self.store.get_current_presence_token()
@@ -1104,6 +1082,31 @@ class PresenceEventSource(object):
def get_pagination_rows(self, user, pagination_config, key):
return self.get_new_events(user, from_key=None, include_offline=False)
+ @cachedInlineCallbacks(num_args=2, cache_context=True)
+ def _get_interested_in(self, user, explicit_room_id, cache_context):
+ """Returns the set of users that the given user should see presence
+ updates for
+ """
+ user_id = user.to_string()
+ plist = yield self.store.get_presence_list_accepted(
+ user.localpart, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in = set(row["observed_user_id"] for row in plist)
+ users_interested_in.add(user_id) # So that we receive our own presence
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in.update(users_who_share_room)
+
+ if explicit_room_id:
+ user_ids = yield self.store.get_users_in_room(
+ explicit_room_id, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in.update(user_ids)
+
+ defer.returnValue(users_interested_in)
+
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
@@ -1171,7 +1174,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
if user_id not in syncing_user_ids:
- if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+ # If the user has done something recently but hasn't synced,
+ # don't set them as offline.
+ sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+ if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
status_msg=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 87f74dfb8e..9bf638f818 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler):
args={
"user_id": target_user.to_string(),
"field": "displayname",
- }
+ },
+ ignore_backoff=True,
)
except CodeMessageException as e:
if e.code != 404:
@@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler):
args={
"user_id": target_user.to_string(),
"field": "avatar_url",
- }
+ },
+ ignore_backoff=True,
)
except CodeMessageException as e:
if e.code != 404:
@@ -156,11 +158,11 @@ class ProfileHandler(BaseHandler):
self.ratelimit(requester)
- joins = yield self.store.get_rooms_for_user(
+ room_ids = yield self.store.get_rooms_for_user(
user.to_string(),
)
- for j in joins:
+ for room_id in room_ids:
handler = self.hs.get_handlers().room_member_handler
try:
# Assume the user isn't a guest because we don't let guests set
@@ -171,12 +173,12 @@ class ProfileHandler(BaseHandler):
yield handler.update_membership(
requester,
user,
- j.room_id,
+ room_id,
"join", # We treat a profile update like a join.
ratelimit=False, # Try to hide that these events aren't atomic.
)
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
- j.room_id, str(e.message)
+ room_id, str(e.message)
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 50aa513935..e1cd3a48e9 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -210,10 +210,9 @@ class ReceiptEventSource(object):
else:
from_key = None
- rooms = yield self.store.get_rooms_for_user(user.to_string())
- rooms = [room.room_id for room in rooms]
+ room_ids = yield self.store.get_rooms_for_user(user.to_string())
events = yield self.store.get_linearized_receipts_for_rooms(
- rooms,
+ room_ids,
from_key=from_key,
to_key=to_key,
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 286f0cef0a..03c6a85fc6 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -40,6 +40,8 @@ class RegistrationHandler(BaseHandler):
self._next_generated_user_id = None
+ self.macaroon_gen = hs.get_macaroon_generator()
+
@defer.inlineCallbacks
def check_username(self, localpart, guest_access_token=None,
assigned_user_id=None):
@@ -143,7 +145,7 @@ class RegistrationHandler(BaseHandler):
token = None
if generate_token:
- token = self.auth_handler().generate_access_token(user_id)
+ token = self.macaroon_gen.generate_access_token(user_id)
yield self.store.register(
user_id=user_id,
token=token,
@@ -167,7 +169,7 @@ class RegistrationHandler(BaseHandler):
user_id = user.to_string()
yield self.check_user_id_not_appservice_exclusive(user_id)
if generate_token:
- token = self.auth_handler().generate_access_token(user_id)
+ token = self.macaroon_gen.generate_access_token(user_id)
try:
yield self.store.register(
user_id=user_id,
@@ -254,7 +256,7 @@ class RegistrationHandler(BaseHandler):
user_id = user.to_string()
yield self.check_user_id_not_appservice_exclusive(user_id)
- token = self.auth_handler().generate_access_token(user_id)
+ token = self.macaroon_gen.generate_access_token(user_id)
try:
yield self.store.register(
user_id=user_id,
@@ -399,7 +401,7 @@ class RegistrationHandler(BaseHandler):
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
- token = self.auth_handler().generate_access_token(user_id)
+ token = self.macaroon_gen.generate_access_token(user_id)
if need_register:
yield self.store.register(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5f18007e90..99cb7db0db 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -356,7 +356,7 @@ class RoomCreationHandler(BaseHandler):
class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit, is_guest):
+ def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -375,12 +375,15 @@ class RoomContextHandler(BaseHandler):
now_token = yield self.hs.get_event_sources().get_current_token()
+ users = yield self.store.get_users_in_room(room_id)
+ is_peeking = user.to_string() not in users
+
def filter_evts(events):
return filter_events_for_client(
self.store,
user.to_string(),
events,
- is_peeking=is_guest
+ is_peeking=is_peeking
)
event = yield self.store.get_event(event_id, get_prev_content=True,
@@ -437,6 +440,7 @@ class RoomEventSource(object):
limit,
room_ids,
is_guest,
+ explicit_room_id=None,
):
# We just ignore the key for now.
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 19eebbd43f..516cd9a6ac 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -21,6 +21,7 @@ from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.async import concurrently_execute
+from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
+ logger.info(
+ "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
+ limit, since_token, bool(search_filter), network_tuple,
+ )
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
rooms_to_order_value = {}
rooms_to_num_joined = {}
- rooms_to_latest_event_ids = {}
newly_visible = []
newly_unpublished = []
@@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_order_for_room(room_id):
- latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
- if not latest_event_ids:
+ # Most of the rooms won't have changed between the since token and
+ # now (especially if the since token is "now"). So, we can ask what
+ # the current users are in a room (that will hit a cache) and then
+ # check if the room has changed since the since token. (We have to
+ # do it in that order to avoid races).
+ # If things have changed then fall back to getting the current state
+ # at the since token.
+ joined_users = yield self.store.get_users_in_room(room_id)
+ if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token
)
- rooms_to_latest_event_ids[room_id] = latest_event_ids
- if not latest_event_ids:
- return
+ if not latest_event_ids:
+ return
+
+ joined_users = yield self.state_handler.get_current_user_in_room(
+ room_id, latest_event_ids,
+ )
- joined_users = yield self.state_handler.get_current_user_in_room(
- room_id, latest_event_ids,
- )
num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users
@@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse()
- # Actually generate the entries. _generate_room_entry will append to
+ # Actually generate the entries. _append_room_entry_to_chunk will append to
# chunk but will stop if len(chunk) > limit
chunk = []
if limit and not search_filter:
step = limit + 1
for i in xrange(0, len(rooms_to_scan), step):
# We iterate here because the vast majority of cases we'll stop
- # at first iteration, but occaisonally _generate_room_entry
+ # at first iteration, but occaisonally _append_room_entry_to_chunk
# won't append to the chunk and so we need to loop again.
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
yield concurrently_execute(
- lambda r: self._generate_room_entry(
+ lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
@@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
break
else:
yield concurrently_execute(
- lambda r: self._generate_room_entry(
+ lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
@@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results)
@defer.inlineCallbacks
- def _generate_room_entry(self, room_id, num_joined_users, chunk, limit,
- search_filter):
+ def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
+ search_filter):
+ """Generate the entry for a room in the public room list and append it
+ to the `chunk` if it matches the search filter
+ """
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return
+ result = yield self._generate_room_entry(room_id, num_joined_users)
+
+ if result and _matches_room_entry(result, search_filter):
+ chunk.append(result)
+
+ @cachedInlineCallbacks(num_args=1, cache_context=True)
+ def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+ """Returns the entry for a room
+ """
result = {
"room_id": room_id,
"num_joined_members": num_joined_users,
}
- current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+ current_state_ids = yield self.store.get_current_state_ids(
+ room_id, on_invalidate=cache_context.invalidate,
+ )
event_map = yield self.store.get_events([
- event_id for key, event_id in current_state_ids.items()
+ event_id for key, event_id in current_state_ids.iteritems()
if key[0] in (
EventTypes.JoinRules,
EventTypes.Name,
@@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
- aliases = yield self.store.get_aliases_for_room(room_id)
+ aliases = yield self.store.get_aliases_for_room(
+ room_id, on_invalidate=cache_context.invalidate
+ )
if aliases:
result["aliases"] = aliases
@@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- if _matches_room_entry(result, search_filter):
- chunk.append(result)
+ defer.returnValue(result)
@defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b2806555cf..2052d6d05f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler):
)
membership = member.membership if member else None
- if membership is not None and membership != Membership.LEAVE:
+ if membership is not None and membership not in [
+ Membership.LEAVE, Membership.BAN
+ ]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c880f61685..c0205da1a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,10 +16,11 @@
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
+from synapse.types import RoomStreamToken
from twisted.internet import defer
@@ -115,6 +116,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
+ "device_lists", # List of user_ids whose devices have chanegd
])):
__slots__ = []
@@ -129,7 +131,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.invited or
self.archived or
self.account_data or
- self.to_device
+ self.to_device or
+ self.device_lists
)
@@ -223,8 +226,7 @@ class SyncHandler(object):
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
+ room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
@@ -544,6 +546,10 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_to_device(sync_result_builder)
+ device_lists = yield self._generate_sync_entry_for_device_list(
+ sync_result_builder
+ )
+
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@@ -551,9 +557,32 @@ class SyncHandler(object):
invited=sync_result_builder.invited,
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
+ device_lists=device_lists,
next_batch=sync_result_builder.now_token,
))
+ @measure_func("_generate_sync_entry_for_device_list")
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_device_list(self, sync_result_builder):
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+
+ if since_token and since_token.device_list_key:
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ user_ids_changed = set()
+ changed = yield self.store.get_user_whose_devices_changed(
+ since_token.device_list_key
+ )
+ for other_user_id in changed:
+ other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
+ if room_ids.intersection(other_room_ids):
+ user_ids_changed.add(other_user_id)
+
+ defer.returnValue(user_ids_changed)
+ else:
+ defer.returnValue([])
+
@defer.inlineCallbacks
def _generate_sync_entry_for_to_device(self, sync_result_builder):
"""Generates the portion of the sync response. Populates
@@ -579,14 +608,14 @@ class SyncHandler(object):
deleted = yield self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
- logger.info("Deleted %d to-device messages up to %d",
- deleted, since_stream_id)
+ logger.debug("Deleted %d to-device messages up to %d",
+ deleted, since_stream_id)
messages, stream_id = yield self.store.get_new_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)
- logger.info(
+ logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages), since_stream_id, stream_id, now_token.to_device_key
)
@@ -691,14 +720,14 @@ class SyncHandler(object):
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
- states = yield self.presence_handler.get_states(
- extra_users_ids,
- as_event=True,
- )
- presence.extend(states)
+ if extra_users_ids:
+ states = yield self.presence_handler.get_states(
+ extra_users_ids,
+ )
+ presence.extend(states)
- # Deduplicate the presence entries so that there's at most one per user
- presence = {p["content"]["user_id"]: p for p in presence}.values()
+ # Deduplicate the presence entries so that there's at most one per user
+ presence = {p.user_id: p for p in presence}.values()
presence = sync_config.filter_collection.filter_presence(
presence
@@ -735,6 +764,21 @@ class SyncHandler(object):
)
sync_result_builder.now_token = now_token
+ # We check up front if anything has changed, if it hasn't then there is
+ # no point in going futher.
+ since_token = sync_result_builder.since_token
+ if not sync_result_builder.full_state:
+ if since_token and not ephemeral_by_room and not account_data_by_room:
+ have_changed = yield self._have_rooms_changed(sync_result_builder)
+ if not have_changed:
+ tags_by_room = yield self.store.get_updated_tags(
+ user_id,
+ since_token.account_data_key,
+ )
+ if not tags_by_room:
+ logger.debug("no-oping sync")
+ defer.returnValue(([], []))
+
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id,
)
@@ -744,13 +788,12 @@ class SyncHandler(object):
else:
ignored_users = frozenset()
- if sync_result_builder.since_token:
+ if since_token:
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
tags_by_room = yield self.store.get_updated_tags(
- user_id,
- sync_result_builder.since_token.account_data_key,
+ user_id, since_token.account_data_key,
)
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -775,7 +818,7 @@ class SyncHandler(object):
# Now we want to get any newly joined users
newly_joined_users = set()
- if sync_result_builder.since_token:
+ if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
joined_sync.timeline.events, joined_sync.state.values()
@@ -788,6 +831,38 @@ class SyncHandler(object):
defer.returnValue((newly_joined_rooms, newly_joined_users))
@defer.inlineCallbacks
+ def _have_rooms_changed(self, sync_result_builder):
+ """Returns whether there may be any new events that should be sent down
+ the sync. Returns True if there are.
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+
+ assert since_token
+
+ # Get a list of membership change events that have happened.
+ rooms_changed = yield self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key
+ )
+
+ if rooms_changed:
+ defer.returnValue(True)
+
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ rooms = yield self.store.get_app_service_rooms(app_service)
+ joined_room_ids = set(r.room_id for r in rooms)
+ else:
+ joined_room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+ for room_id in joined_room_ids:
+ if self.store.has_room_changed_since(room_id, stream_id):
+ defer.returnValue(True)
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync.
@@ -811,8 +886,7 @@ class SyncHandler(object):
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
- rooms = yield self.store.get_rooms_for_user(user_id)
- joined_room_ids = set(r.room_id for r in rooms)
+ joined_room_ids = yield self.store.get_rooms_for_user(user_id)
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
|