diff options
-rw-r--r-- | README.rst | 49 | ||||
-rwxr-xr-x | setup.py | 73 | ||||
-rw-r--r-- | synapse/api/auth.py | 51 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 10 | ||||
-rw-r--r-- | synapse/handlers/message.py | 60 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 6 | ||||
-rw-r--r-- | synapse/handlers/register.py | 2 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 1 | ||||
-rw-r--r-- | synapse/notifier.py | 40 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 12 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 36 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/keys.py | 43 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 1 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 2 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 102 | ||||
-rw-r--r-- | synapse/storage/schema/delta/39/membership_profile.sql | 20 | ||||
-rw-r--r-- | synapse/storage/stream.py | 3 | ||||
-rw-r--r-- | synapse/util/__init__.py | 7 | ||||
-rw-r--r-- | synapse/util/async.py | 58 | ||||
-rw-r--r-- | tests/api/test_auth.py | 93 | ||||
-rw-r--r-- | tests/util/test_limiter.py | 70 | ||||
-rw-r--r-- | tests/utils.py | 4 |
22 files changed, 579 insertions, 164 deletions
diff --git a/README.rst b/README.rst index 6a22399e19..21bd63fd64 100644 --- a/README.rst +++ b/README.rst @@ -88,7 +88,12 @@ System requirements: - Python 2.7 - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org -Synapse is written in python but some of the libraries is uses are written in +Installing from source +---------------------- +(Prebuilt packages are available for some platforms - see `Platform-Specific +Instructions`_.) + +Synapse is written in python but some of the libraries it uses are written in C. So before we can install synapse itself we need a working C compiler and the header files for python C extensions. @@ -131,6 +136,10 @@ Installing prerequisites on openSUSE:: sudo zypper in python-pip python-setuptools sqlite3 python-virtualenv \ python-devel libffi-devel libopenssl-devel libjpeg62-devel +Installing prerequisites on OpenBSD:: + doas pkg_add python libffi py-pip py-setuptools sqlite3 py-virtualenv \ + libxslt + To install the synapse homeserver run:: virtualenv -p python2.7 ~/.synapse @@ -302,7 +311,7 @@ See https://github.com/vector-im/vector-web/issues/1977 and https://developer.github.com/changes/2014-04-25-user-content-security for more details. -Platform Specific Instructions +Platform-Specific Instructions ============================== Debian @@ -364,6 +373,32 @@ Synapse can be installed via FreeBSD Ports or Packages contributed by Brendan Mo - Ports: ``cd /usr/ports/net/py-matrix-synapse && make install clean`` - Packages: ``pkg install py27-matrix-synapse`` + +OpenBSD +------- + +There is currently no port for OpenBSD. Additionally, OpenBSD's security +settings require a slightly more difficult installation process. + +1) Create a new directory in ``/usr/local`` called ``_synapse``. Also, create a + new user called ``_synapse`` and set that directory as the new user's home. + This is required because, by default, OpenBSD only allows binaries which need + write and execute permissions on the same memory space to be run from + ``/usr/local``. +2) ``su`` to the new ``_synapse`` user and change to their home directory. +3) Create a new virtualenv: ``virtualenv -p python2.7 ~/.synapse`` +4) Source the virtualenv configuration located at + ``/usr/local/_synapse/.synapse/bin/activate``. This is done in ``ksh`` by + using the ``.`` command, rather than ``bash``'s ``source``. +5) Optionally, use ``pip`` to install ``lxml``, which Synapse needs to parse + webpages for their titles. +6) Use ``pip`` to install this repository: ``pip install + https://github.com/matrix-org/synapse/tarball/master`` +7) Optionally, change ``_synapse``'s shell to ``/bin/false`` to reduce the + chance of a compromised Synapse server being used to take over your box. + +After this, you may proceed with the rest of the install directions. + NixOS ----- @@ -759,6 +794,10 @@ Then update the `users` table in the database:: Synapse Development =================== +Before setting up a development environment for synapse, make sure you have the +system dependencies (such as the python header files) installed - see +`Installing from source`_. + To check out a synapse for development, clone the git repo into a working directory of your choice:: @@ -770,8 +809,8 @@ to install using pip and a virtualenv:: virtualenv env source env/bin/activate - python synapse/python_dependencies.py | xargs -n1 pip install - pip install setuptools_trial mock + python synapse/python_dependencies.py | xargs pip install + pip install lxml mock This will run a process of downloading and installing all the needed dependencies into a virtual env. @@ -779,7 +818,7 @@ dependencies into a virtual env. Once this is done, you may wish to run Synapse's unit tests, to check that everything is installed as it should be:: - python setup.py test + PYTHONPATH="." trial tests This should end with a 'PASSED' result:: diff --git a/setup.py b/setup.py index c0716a1599..b00c2af367 100755 --- a/setup.py +++ b/setup.py @@ -23,6 +23,45 @@ import sys here = os.path.abspath(os.path.dirname(__file__)) +# Some notes on `setup.py test`: +# +# Once upon a time we used to try to make `setup.py test` run `tox` to run the +# tests. That's a bad idea for three reasons: +# +# 1: `setup.py test` is supposed to find out whether the tests work in the +# *current* environmentt, not whatever tox sets up. +# 2: Empirically, trying to install tox during the test run wasn't working ("No +# module named virtualenv"). +# 3: The tox documentation advises against it[1]. +# +# Even further back in time, we used to use setuptools_trial [2]. That has its +# own set of issues: for instance, it requires installation of Twisted to build +# an sdist (because the recommended mode of usage is to add it to +# `setup_requires`). That in turn means that in order to successfully run tox +# you have to have the python header files installed for whichever version of +# python tox uses (which is python3 on recent ubuntus, for example). +# +# So, for now at least, we stick with what appears to be the convention among +# Twisted projects, and don't attempt to do anything when someone runs +# `setup.py test`; instead we direct people to run `trial` directly if they +# care. +# +# [1]: http://tox.readthedocs.io/en/2.5.0/example/basic.html#integration-with-setup-py-test-command +# [2]: https://pypi.python.org/pypi/setuptools_trial +class TestCommand(Command): + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + print ("""Synapse's tests cannot be run via setup.py. To run them, try: + PYTHONPATH="." trial tests +""") + def read_file(path_segments): """Read a file from the package. Takes a list of strings to join to make the path""" @@ -39,38 +78,6 @@ def exec_file(path_segments): return result -class Tox(Command): - user_options = [('tox-args=', 'a', "Arguments to pass to tox")] - - def initialize_options(self): - self.tox_args = None - - def finalize_options(self): - self.test_args = [] - self.test_suite = True - - def run(self): - #import here, cause outside the eggs aren't loaded - try: - import tox - except ImportError: - try: - self.distribution.fetch_build_eggs("tox") - import tox - except: - raise RuntimeError( - "The tests need 'tox' to run. Please install 'tox'." - ) - import shlex - args = self.tox_args - if args: - args = shlex.split(self.tox_args) - else: - args = [] - errno = tox.cmdline(args=args) - sys.exit(errno) - - version = exec_file(("synapse", "__init__.py"))["__version__"] dependencies = exec_file(("synapse", "python_dependencies.py")) long_description = read_file(("README.rst",)) @@ -86,5 +93,5 @@ setup( zip_safe=False, long_description=long_description, scripts=["synctl"] + glob.glob("scripts/*"), - cmdclass={'test': Tox}, + cmdclass={'test': TestCommand}, ) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index ddab210718..a99986714d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -677,31 +677,28 @@ class Auth(object): @defer.inlineCallbacks def get_user_by_access_token(self, token, rights="access"): - """ Get a registered user's ID. + """ Validate access token and get user_id from it Args: token (str): The access token to get the user by. + rights (str): The operation being performed; the access token must + allow this. Returns: dict : dict that includes the user and the ID of their access token. Raises: AuthError if no user by that token exists or the token is invalid. """ try: - ret = yield self.get_user_from_macaroon(token, rights) - except AuthError: - # TODO(daniel): Remove this fallback when all existing access tokens - # have been re-issued as macaroons. - if self.hs.config.expire_access_token: - raise - ret = yield self._look_up_user_by_access_token(token) - - defer.returnValue(ret) + macaroon = pymacaroons.Macaroon.deserialize(token) + except Exception: # deserialize can throw more-or-less anything + # doesn't look like a macaroon: treat it as an opaque token which + # must be in the database. + # TODO: it would be nice to get rid of this, but apparently some + # people use access tokens which aren't macaroons + r = yield self._look_up_user_by_access_token(token) + defer.returnValue(r) - @defer.inlineCallbacks - def get_user_from_macaroon(self, macaroon_str, rights="access"): try: - macaroon = pymacaroons.Macaroon.deserialize(macaroon_str) - user_id = self.get_user_id_from_macaroon(macaroon) user = UserID.from_string(user_id) @@ -716,6 +713,30 @@ class Auth(object): guest = True if guest: + # Guest access tokens are not stored in the database (there can + # only be one access token per guest, anyway). + # + # In order to prevent guest access tokens being used as regular + # user access tokens (and hence getting around the invalidation + # process), we look up the user id and check that it is indeed + # a guest user. + # + # It would of course be much easier to store guest access + # tokens in the database as well, but that would break existing + # guest tokens. + stored_user = yield self.store.get_user_by_id(user_id) + if not stored_user: + raise AuthError( + self.TOKEN_NOT_FOUND_HTTP_STATUS, + "Unknown user_id %s" % user_id, + errcode=Codes.UNKNOWN_TOKEN + ) + if not stored_user["is_guest"]: + raise AuthError( + self.TOKEN_NOT_FOUND_HTTP_STATUS, + "Guest access token used for regular user", + errcode=Codes.UNKNOWN_TOKEN + ) ret = { "user": user, "is_guest": True, @@ -743,7 +764,7 @@ class Auth(object): # macaroon. They probably should be. # TODO: build the dictionary from the macaroon once the # above are fixed - ret = yield self._look_up_user_by_access_token(macaroon_str) + ret = yield self._look_up_user_by_access_token(token) if ret["user"] != user: logger.error( "Macaroon user (%s) != DB user (%s)", diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index fd11935b40..b63a660c06 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -111,6 +111,11 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except Exception as e: + # include ConnectionRefused and other errors + failures[destination] = { + "status": 503, "message": e.message + } yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(do_remote_query)(destination) @@ -222,6 +227,11 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except Exception as e: + # include ConnectionRefused and other errors + failures[destination] = { + "status": 503, "message": e.message + } yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fd09397226..7a57a69bd3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -24,7 +24,7 @@ from synapse.push.action_generator import ActionGenerator from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) -from synapse.util.async import run_on_reactor, ReadWriteLock +from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter from synapse.util.logcontext import preserve_fn from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -50,6 +50,10 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. + self.limiter = Limiter(max_count=5) + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -191,36 +195,38 @@ class MessageHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) + with (yield self.limiter.queue(builder.room_id)): + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.hs.get_handlers().profile_handler + content = builder.content + + try: + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.hs.get_handlers().profile_handler - content = builder.content + if token_id is not None: + builder.internal_metadata.token_id = token_id - try: - content["displayname"] = yield profile.get_displayname(target) - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - if token_id is not None: - builder.internal_metadata.token_id = token_id - - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + event, context = yield self._create_new_client_event( + builder=builder, + prev_event_ids=prev_event_ids, + ) - event, context = yield self._create_new_client_event( - builder=builder, - prev_event_ids=prev_event_ids, - ) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 916e80a48e..50aa513935 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -100,7 +100,7 @@ class ReceiptsHandler(BaseHandler): if not res: # res will be None if this read receipt is 'old' - defer.returnValue(False) + continue stream_id, max_persisted_id = res @@ -109,6 +109,10 @@ class ReceiptsHandler(BaseHandler): if max_batch_id is None or max_persisted_id > max_batch_id: max_batch_id = max_persisted_id + if min_batch_id is None: + # no new receipts + defer.returnValue(False) + affected_room_ids = list(set([r["room_id"] for r in receipts])) with PreserveLoggingContext(): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 886fec8701..286f0cef0a 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -81,7 +81,7 @@ class RegistrationHandler(BaseHandler): "User ID already taken.", errcode=Codes.USER_IN_USE, ) - user_data = yield self.auth.get_user_from_macaroon(guest_access_token) + user_data = yield self.auth.get_user_by_access_token(guest_access_token) if not user_data["is_guest"] or user_data["user"].localpart != localpart: raise AuthError( 403, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a86996689c..b62773dcbe 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -510,6 +510,7 @@ class SyncHandler(object): Returns: Deferred(SyncResult) """ + logger.info("Calculating sync response for %r", sync_config.user) # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability diff --git a/synapse/notifier.py b/synapse/notifier.py index 054ca59ad2..acbd4bb5ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -294,14 +295,7 @@ class Notifier(object): result = None if timeout: - # Will be set to a _NotificationListener that we'll be waiting on. - # Allows us to cancel it. - listener = None - - def timed_out(): - if listener: - listener.deferred.cancel() - timer = self.clock.call_later(timeout / 1000., timed_out) + end_time = self.clock.time_msec() + timeout prev_token = from_token while not result: @@ -312,6 +306,10 @@ class Notifier(object): if result: break + now = self.clock.time_msec() + if end_time <= now: + break + # Now we wait for the _NotifierUserStream to be told there # is a new token. # We need to supply the token we supplied to callback so @@ -319,11 +317,14 @@ class Notifier(object): prev_token = current_token listener = user_stream.new_listener(prev_token) with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) + except DeferredTimedOutError: + break except defer.CancelledError: break - - self.clock.cancel_call_later(timer, ignore_errs=True) else: current_token = user_stream.current_token result = yield callback(from_token, current_token) @@ -492,22 +493,27 @@ class Notifier(object): """ listener = _NotificationListener(None) - def timed_out(): - listener.deferred.cancel() + end_time = self.clock.time_msec() + timeout - timer = self.clock.call_later(timeout / 1000., timed_out) while True: listener.deferred = self.replication_deferred.observe() result = yield callback() if result: break + now = self.clock.time_msec() + if end_time <= now: + break + try: with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) + except DeferredTimedOutError: + break except defer.CancelledError: break - self.clock.cancel_call_later(timer, ignore_errs=True) - defer.returnValue(result) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index be55598c43..78b095c903 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -87,12 +87,12 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): - display_name = None - member_ev_id = context.current_state_ids.get((EventTypes.Member, uid)) - if member_ev_id: - member_ev = yield self.store.get_event(member_ev_id, allow_none=True) - if member_ev: - display_name = member_ev.content.get("displayname", None) + display_name = room_members.get(uid, {}).get("display_name", None) + if not display_name: + # Handle the case where we are pushing a membership event to + # that user, as they might not be already joined. + if event.type == EventTypes.Member and event.state_key == uid: + display_name = event.content.get("displayname", None) filtered = filtered_by_user[uid] if len(filtered) == 0: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b13095405b..eead435bfd 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -386,6 +386,24 @@ class RoomMemberListRestServlet(ClientV1RestServlet): })) +class JoinedRoomMemberListRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$") + + def __init__(self, hs): + super(JoinedRoomMemberListRestServlet, self).__init__(hs) + self.state = hs.get_state_handler() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + yield self.auth.get_user_by_req(request) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + defer.returnValue((200, { + "joined": users_with_profile + })) + + # TODO: Needs better unit testing class RoomMessageListRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$") @@ -709,6 +727,22 @@ class SearchRestServlet(ClientV1RestServlet): defer.returnValue((200, results)) +class JoinedRoomsRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/joined_rooms$") + + def __init__(self, hs): + super(JoinedRoomsRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + + rooms = yield self.store.get_rooms_for_user(requester.user.to_string()) + room_ids = set(r.room_id for r in rooms) # Ensure they're unique. + defer.returnValue((200, {"joined_rooms": list(room_ids)})) + + def register_txn_path(servlet, regex_string, http_server, with_get=False): """Registers a transaction-based path. @@ -744,6 +778,7 @@ def register_servlets(hs, http_server): RoomStateEventRestServlet(hs).register(http_server) RoomCreateRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server) + JoinedRoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) RoomForgetRestServlet(hs).register(http_server) @@ -755,4 +790,5 @@ def register_servlets(hs, http_server): RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) + JoinedRoomsRestServlet(hs).register(http_server) RoomEventContext(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 08b7c99d57..46789775b9 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -94,10 +94,6 @@ class KeyUploadServlet(RestServlet): class KeyQueryServlet(RestServlet): """ - GET /keys/query/<user_id> HTTP/1.1 - - GET /keys/query/<user_id>/<device_id> HTTP/1.1 - POST /keys/query HTTP/1.1 Content-Type: application/json { @@ -131,11 +127,7 @@ class KeyQueryServlet(RestServlet): """ PATTERNS = client_v2_patterns( - "/keys/query(?:" - "/(?P<user_id>[^/]*)(?:" - "/(?P<device_id>[^/]*)" - ")?" - ")?", + "/keys/query$", releases=() ) @@ -149,31 +141,16 @@ class KeyQueryServlet(RestServlet): self.e2e_keys_handler = hs.get_e2e_keys_handler() @defer.inlineCallbacks - def on_POST(self, request, user_id, device_id): + def on_POST(self, request): yield self.auth.get_user_by_req(request, allow_guest=True) timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) result = yield self.e2e_keys_handler.query_devices(body, timeout) defer.returnValue((200, result)) - @defer.inlineCallbacks - def on_GET(self, request, user_id, device_id): - requester = yield self.auth.get_user_by_req(request, allow_guest=True) - timeout = parse_integer(request, "timeout", 10 * 1000) - auth_user_id = requester.user.to_string() - user_id = user_id if user_id else auth_user_id - device_ids = [device_id] if device_id else [] - result = yield self.e2e_keys_handler.query_devices( - {"device_keys": {user_id: device_ids}}, - timeout, - ) - defer.returnValue((200, result)) - class OneTimeKeyServlet(RestServlet): """ - GET /keys/claim/<user-id>/<device-id>/<algorithm> HTTP/1.1 - POST /keys/claim HTTP/1.1 { "one_time_keys": { @@ -191,9 +168,7 @@ class OneTimeKeyServlet(RestServlet): """ PATTERNS = client_v2_patterns( - "/keys/claim(?:/?|(?:/" - "(?P<user_id>[^/]*)/(?P<device_id>[^/]*)/(?P<algorithm>[^/]*)" - ")?)", + "/keys/claim$", releases=() ) @@ -203,17 +178,7 @@ class OneTimeKeyServlet(RestServlet): self.e2e_keys_handler = hs.get_e2e_keys_handler() @defer.inlineCallbacks - def on_GET(self, request, user_id, device_id, algorithm): - yield self.auth.get_user_by_req(request, allow_guest=True) - timeout = parse_integer(request, "timeout", 10 * 1000) - result = yield self.e2e_keys_handler.claim_one_time_keys( - {"one_time_keys": {user_id: {device_id: algorithm}}}, - timeout, - ) - defer.returnValue((200, result)) - - @defer.inlineCallbacks - def on_POST(self, request, user_id, device_id, algorithm): + def on_POST(self, request): yield self.auth.get_user_by_req(request, allow_guest=True) timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index db146ed348..fe936b3e62 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -222,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore, ) self._stream_order_on_start = self.get_room_max_stream_ordering() + self._min_stream_order_on_start = self.get_room_min_stream_ordering() super(DataStore, self).__init__(hs) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 9747a04a9a..f72d15f5ed 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -405,7 +405,7 @@ class ReceiptsStore(SQLBaseStore): room_id, receipt_type, user_id, event_ids, data ) - max_persisted_id = self._stream_id_gen.get_current_token() + max_persisted_id = self._receipts_id_gen.get_current_token() defer.returnValue((stream_id, max_persisted_id)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 866d64e679..b2a45a38c1 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id import logging +import ujson as json logger = logging.getLogger(__name__) @@ -34,7 +35,15 @@ RoomsForUser = namedtuple( ) +_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" + + class RoomMemberStore(SQLBaseStore): + def __init__(self, hs): + super(RoomMemberStore, self).__init__(hs) + self.register_background_update_handler( + _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile + ) def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database. @@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore): "sender": event.user_id, "room_id": event.room_id, "membership": event.membership, + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), } for event in events ] @@ -398,7 +409,7 @@ class RoomMemberStore(SQLBaseStore): table="room_memberships", column="event_id", iterable=member_event_ids, - retcols=['user_id'], + retcols=['user_id', 'display_name', 'avatar_url'], keyvalues={ "membership": Membership.JOIN, }, @@ -406,11 +417,21 @@ class RoomMemberStore(SQLBaseStore): desc="_get_joined_users_from_context", ) - users_in_room = set(row["user_id"] for row in rows) + users_in_room = { + row["user_id"]: { + "display_name": row["display_name"], + "avatar_url": row["avatar_url"], + } + for row in rows + } + if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: if event.event_id in member_event_ids: - users_in_room.add(event.state_key) + users_in_room[event.state_key] = { + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), + } defer.returnValue(users_in_room) @@ -448,3 +469,78 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(True) defer.returnValue(False) + + @defer.inlineCallbacks + def _background_add_membership_profile(self, progress, batch_size): + target_min_stream_id = progress.get( + "target_min_stream_id_inclusive", self._min_stream_order_on_start + ) + max_stream_id = progress.get( + "max_stream_id_exclusive", self._stream_order_on_start + 1 + ) + + INSERT_CLUMP_SIZE = 1000 + + def add_membership_profile_txn(txn): + sql = (""" + SELECT stream_ordering, event_id, room_id, content + FROM events + INNER JOIN room_memberships USING (room_id, event_id) + WHERE ? <= stream_ordering AND stream_ordering < ? + AND type = 'm.room.member' + ORDER BY stream_ordering DESC + LIMIT ? + """) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = self.cursor_to_dict(txn) + if not rows: + return 0 + + min_stream_id = rows[-1]["stream_ordering"] + + to_update = [] + for row in rows: + event_id = row["event_id"] + room_id = row["room_id"] + try: + content = json.loads(row["content"]) + except: + continue + + display_name = content.get("displayname", None) + avatar_url = content.get("avatar_url", None) + + if display_name or avatar_url: + to_update.append(( + display_name, avatar_url, event_id, room_id + )) + + to_update_sql = (""" + UPDATE room_memberships SET display_name = ?, avatar_url = ? + WHERE event_id = ? AND room_id = ? + """) + for index in range(0, len(to_update), INSERT_CLUMP_SIZE): + clump = to_update[index:index + INSERT_CLUMP_SIZE] + txn.executemany(to_update_sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + } + + self._background_update_progress_txn( + txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress + ) + + return len(to_update) + + result = yield self.runInteraction( + _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn + ) + + if not result: + yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME) + + defer.returnValue(result) diff --git a/synapse/storage/schema/delta/39/membership_profile.sql b/synapse/storage/schema/delta/39/membership_profile.sql new file mode 100644 index 0000000000..1bf911c8ab --- /dev/null +++ b/synapse/storage/schema/delta/39/membership_profile.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE room_memberships ADD COLUMN display_name TEXT; +ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT; + +INSERT into background_updates (update_name, progress_json) + VALUES ('room_membership_profile_update', '{}'); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7fa63b58a7..2dc24951c4 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore): def get_room_max_stream_ordering(self): return self._stream_id_gen.get_current_token() + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() + def get_stream_token_for_event(self, event_id): """The stream token for an event Args: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c05b9450be..30fc480108 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -24,6 +24,11 @@ import logging logger = logging.getLogger(__name__) +class DeferredTimedOutError(SynapseError): + def __init__(self): + super(SynapseError).__init__(504, "Timed out") + + def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -89,7 +94,7 @@ class Clock(object): def timed_out_fn(): try: - ret_deferred.errback(SynapseError(504, "Timed out")) + ret_deferred.errback(DeferredTimedOutError()) except: pass diff --git a/synapse/util/async.py b/synapse/util/async.py index 347fb1e380..16ed183d4c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -197,6 +197,64 @@ class Linearizer(object): defer.returnValue(_ctx_manager()) +class Limiter(object): + """Limits concurrent access to resources based on a key. Useful to ensure + only a few thing happen at a time on a given resource. + + Example: + + with (yield limiter.queue("test_key")): + # do some work. + + """ + def __init__(self, max_count): + """ + Args: + max_count(int): The maximum number of concurrent access + """ + self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing + # the second element is a list of deferreds for the things blocked from + # executing. + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + entry = self.key_to_defer.setdefault(key, [0, []]) + + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1].append(new_defer) + with PreserveLoggingContext(): + yield new_defer + + entry[0] += 1 + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them + entry[0] -= 1 + try: + entry[1].pop(0).callback(None) + except IndexError: + # If nothing else is executing for this key then remove it + # from the map + if entry[0] == 0: + self.key_to_defer.pop(key, None) + + defer.returnValue(_ctx_manager()) + + class ReadWriteLock(object): """A deferred style read write lock. diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 2cf262bb46..4575dd9834 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -12,17 +12,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from tests import unittest -from twisted.internet import defer +import pymacaroons from mock import Mock +from twisted.internet import defer +import synapse.handlers.auth from synapse.api.auth import Auth from synapse.api.errors import AuthError from synapse.types import UserID +from tests import unittest from tests.utils import setup_test_homeserver, mock_getRawHeaders -import pymacaroons + +class TestHandlers(object): + def __init__(self, hs): + self.auth_handler = synapse.handlers.auth.AuthHandler(hs) class AuthTestCase(unittest.TestCase): @@ -34,14 +39,17 @@ class AuthTestCase(unittest.TestCase): self.hs = yield setup_test_homeserver(handlers=None) self.hs.get_datastore = Mock(return_value=self.store) + self.hs.handlers = TestHandlers(self.hs) self.auth = Auth(self.hs) self.test_user = "@foo:bar" self.test_token = "_test_token_" + # this is overridden for the appservice tests + self.store.get_app_service_by_token = Mock(return_value=None) + @defer.inlineCallbacks def test_get_user_by_req_user_valid_token(self): - self.store.get_app_service_by_token = Mock(return_value=None) user_info = { "name": self.test_user, "token_id": "ditto", @@ -56,7 +64,6 @@ class AuthTestCase(unittest.TestCase): self.assertEquals(requester.user.to_string(), self.test_user) def test_get_user_by_req_user_bad_token(self): - self.store.get_app_service_by_token = Mock(return_value=None) self.store.get_user_by_access_token = Mock(return_value=None) request = Mock(args={}) @@ -66,7 +73,6 @@ class AuthTestCase(unittest.TestCase): self.failureResultOf(d, AuthError) def test_get_user_by_req_user_missing_token(self): - self.store.get_app_service_by_token = Mock(return_value=None) user_info = { "name": self.test_user, "token_id": "ditto", @@ -158,7 +164,7 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("gen = 1") macaroon.add_first_party_caveat("type = access") macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) - user_info = yield self.auth.get_user_from_macaroon(macaroon.serialize()) + user_info = yield self.auth.get_user_by_access_token(macaroon.serialize()) user = user_info["user"] self.assertEqual(UserID.from_string(user_id), user) @@ -168,6 +174,10 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_guest_user_from_macaroon(self): + self.store.get_user_by_id = Mock(return_value={ + "is_guest": True, + }) + user_id = "@baldrick:matrix.org" macaroon = pymacaroons.Macaroon( location=self.hs.config.server_name, @@ -179,11 +189,12 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("guest = true") serialized = macaroon.serialize() - user_info = yield self.auth.get_user_from_macaroon(serialized) + user_info = yield self.auth.get_user_by_access_token(serialized) user = user_info["user"] is_guest = user_info["is_guest"] self.assertEqual(UserID.from_string(user_id), user) self.assertTrue(is_guest) + self.store.get_user_by_id.assert_called_with(user_id) @defer.inlineCallbacks def test_get_user_from_macaroon_user_db_mismatch(self): @@ -200,7 +211,7 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("type = access") macaroon.add_first_party_caveat("user_id = %s" % (user,)) with self.assertRaises(AuthError) as cm: - yield self.auth.get_user_from_macaroon(macaroon.serialize()) + yield self.auth.get_user_by_access_token(macaroon.serialize()) self.assertEqual(401, cm.exception.code) self.assertIn("User mismatch", cm.exception.msg) @@ -220,7 +231,7 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("type = access") with self.assertRaises(AuthError) as cm: - yield self.auth.get_user_from_macaroon(macaroon.serialize()) + yield self.auth.get_user_by_access_token(macaroon.serialize()) self.assertEqual(401, cm.exception.code) self.assertIn("No user caveat", cm.exception.msg) @@ -242,7 +253,7 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("user_id = %s" % (user,)) with self.assertRaises(AuthError) as cm: - yield self.auth.get_user_from_macaroon(macaroon.serialize()) + yield self.auth.get_user_by_access_token(macaroon.serialize()) self.assertEqual(401, cm.exception.code) self.assertIn("Invalid macaroon", cm.exception.msg) @@ -265,7 +276,7 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("cunning > fox") with self.assertRaises(AuthError) as cm: - yield self.auth.get_user_from_macaroon(macaroon.serialize()) + yield self.auth.get_user_by_access_token(macaroon.serialize()) self.assertEqual(401, cm.exception.code) self.assertIn("Invalid macaroon", cm.exception.msg) @@ -293,12 +304,12 @@ class AuthTestCase(unittest.TestCase): self.hs.clock.now = 5000 # seconds self.hs.config.expire_access_token = True - # yield self.auth.get_user_from_macaroon(macaroon.serialize()) + # yield self.auth.get_user_by_access_token(macaroon.serialize()) # TODO(daniel): Turn on the check that we validate expiration, when we # validate expiration (and remove the above line, which will start # throwing). with self.assertRaises(AuthError) as cm: - yield self.auth.get_user_from_macaroon(macaroon.serialize()) + yield self.auth.get_user_by_access_token(macaroon.serialize()) self.assertEqual(401, cm.exception.code) self.assertIn("Invalid macaroon", cm.exception.msg) @@ -327,6 +338,58 @@ class AuthTestCase(unittest.TestCase): self.hs.clock.now = 5000 # seconds self.hs.config.expire_access_token = True - user_info = yield self.auth.get_user_from_macaroon(macaroon.serialize()) + user_info = yield self.auth.get_user_by_access_token(macaroon.serialize()) user = user_info["user"] self.assertEqual(UserID.from_string(user_id), user) + + @defer.inlineCallbacks + def test_cannot_use_regular_token_as_guest(self): + USER_ID = "@percy:matrix.org" + self.store.add_access_token_to_user = Mock() + + token = yield self.hs.handlers.auth_handler.issue_access_token( + USER_ID, "DEVICE" + ) + self.store.add_access_token_to_user.assert_called_with( + USER_ID, token, "DEVICE" + ) + + def get_user(tok): + if token != tok: + return None + return { + "name": USER_ID, + "is_guest": False, + "token_id": 1234, + "device_id": "DEVICE", + } + self.store.get_user_by_access_token = get_user + self.store.get_user_by_id = Mock(return_value={ + "is_guest": False, + }) + + # check the token works + request = Mock(args={}) + request.args["access_token"] = [token] + request.requestHeaders.getRawHeaders = mock_getRawHeaders() + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + self.assertEqual(UserID.from_string(USER_ID), requester.user) + self.assertFalse(requester.is_guest) + + # add an is_guest caveat + mac = pymacaroons.Macaroon.deserialize(token) + mac.add_first_party_caveat("guest = true") + guest_tok = mac.serialize() + + # the token should *not* work now + request = Mock(args={}) + request.args["access_token"] = [guest_tok] + request.requestHeaders.getRawHeaders = mock_getRawHeaders() + + with self.assertRaises(AuthError) as cm: + yield self.auth.get_user_by_req(request, allow_guest=True) + + self.assertEqual(401, cm.exception.code) + self.assertEqual("Guest access token used for regular user", cm.exception.msg) + + self.store.get_user_by_id.assert_called_with(USER_ID) diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py new file mode 100644 index 0000000000..9c795d9fdb --- /dev/null +++ b/tests/util/test_limiter.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest + +from twisted.internet import defer + +from synapse.util.async import Limiter + + +class LimiterTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def test_limiter(self): + limiter = Limiter(3) + + key = object() + + d1 = limiter.queue(key) + cm1 = yield d1 + + d2 = limiter.queue(key) + cm2 = yield d2 + + d3 = limiter.queue(key) + cm3 = yield d3 + + d4 = limiter.queue(key) + self.assertFalse(d4.called) + + d5 = limiter.queue(key) + self.assertFalse(d5.called) + + with cm1: + self.assertFalse(d4.called) + self.assertFalse(d5.called) + + self.assertTrue(d4.called) + self.assertFalse(d5.called) + + with cm3: + self.assertFalse(d5.called) + + self.assertTrue(d5.called) + + with cm2: + pass + + with (yield d4): + pass + + with (yield d5): + pass + + d6 = limiter.queue(key) + with (yield d6): + pass diff --git a/tests/utils.py b/tests/utils.py index 2d0bd205fd..d3d6c8021d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -294,6 +294,10 @@ class MockClock(object): def advance_time_msec(self, ms): self.advance_time(ms / 1000.) + def time_bound_deferred(self, d, *args, **kwargs): + # We don't bother timing things out for now. + return d + class SQLiteMemoryDbPool(ConnectionPool, object): def __init__(self): |