diff --git a/README.rst b/README.rst
index 6a22399e19..21bd63fd64 100644
--- a/README.rst
+++ b/README.rst
@@ -88,7 +88,12 @@ System requirements:
- Python 2.7
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
-Synapse is written in python but some of the libraries is uses are written in
+Installing from source
+----------------------
+(Prebuilt packages are available for some platforms - see `Platform-Specific
+Instructions`_.)
+
+Synapse is written in python but some of the libraries it uses are written in
C. So before we can install synapse itself we need a working C compiler and the
header files for python C extensions.
@@ -131,6 +136,10 @@ Installing prerequisites on openSUSE::
sudo zypper in python-pip python-setuptools sqlite3 python-virtualenv \
python-devel libffi-devel libopenssl-devel libjpeg62-devel
+Installing prerequisites on OpenBSD::
+ doas pkg_add python libffi py-pip py-setuptools sqlite3 py-virtualenv \
+ libxslt
+
To install the synapse homeserver run::
virtualenv -p python2.7 ~/.synapse
@@ -302,7 +311,7 @@ See https://github.com/vector-im/vector-web/issues/1977 and
https://developer.github.com/changes/2014-04-25-user-content-security for more details.
-Platform Specific Instructions
+Platform-Specific Instructions
==============================
Debian
@@ -364,6 +373,32 @@ Synapse can be installed via FreeBSD Ports or Packages contributed by Brendan Mo
- Ports: ``cd /usr/ports/net/py-matrix-synapse && make install clean``
- Packages: ``pkg install py27-matrix-synapse``
+
+OpenBSD
+-------
+
+There is currently no port for OpenBSD. Additionally, OpenBSD's security
+settings require a slightly more difficult installation process.
+
+1) Create a new directory in ``/usr/local`` called ``_synapse``. Also, create a
+ new user called ``_synapse`` and set that directory as the new user's home.
+ This is required because, by default, OpenBSD only allows binaries which need
+ write and execute permissions on the same memory space to be run from
+ ``/usr/local``.
+2) ``su`` to the new ``_synapse`` user and change to their home directory.
+3) Create a new virtualenv: ``virtualenv -p python2.7 ~/.synapse``
+4) Source the virtualenv configuration located at
+ ``/usr/local/_synapse/.synapse/bin/activate``. This is done in ``ksh`` by
+ using the ``.`` command, rather than ``bash``'s ``source``.
+5) Optionally, use ``pip`` to install ``lxml``, which Synapse needs to parse
+ webpages for their titles.
+6) Use ``pip`` to install this repository: ``pip install
+ https://github.com/matrix-org/synapse/tarball/master``
+7) Optionally, change ``_synapse``'s shell to ``/bin/false`` to reduce the
+ chance of a compromised Synapse server being used to take over your box.
+
+After this, you may proceed with the rest of the install directions.
+
NixOS
-----
@@ -759,6 +794,10 @@ Then update the `users` table in the database::
Synapse Development
===================
+Before setting up a development environment for synapse, make sure you have the
+system dependencies (such as the python header files) installed - see
+`Installing from source`_.
+
To check out a synapse for development, clone the git repo into a working
directory of your choice::
@@ -770,8 +809,8 @@ to install using pip and a virtualenv::
virtualenv env
source env/bin/activate
- python synapse/python_dependencies.py | xargs -n1 pip install
- pip install setuptools_trial mock
+ python synapse/python_dependencies.py | xargs pip install
+ pip install lxml mock
This will run a process of downloading and installing all the needed
dependencies into a virtual env.
@@ -779,7 +818,7 @@ dependencies into a virtual env.
Once this is done, you may wish to run Synapse's unit tests, to
check that everything is installed as it should be::
- python setup.py test
+ PYTHONPATH="." trial tests
This should end with a 'PASSED' result::
diff --git a/setup.py b/setup.py
index c0716a1599..b00c2af367 100755
--- a/setup.py
+++ b/setup.py
@@ -23,6 +23,45 @@ import sys
here = os.path.abspath(os.path.dirname(__file__))
+# Some notes on `setup.py test`:
+#
+# Once upon a time we used to try to make `setup.py test` run `tox` to run the
+# tests. That's a bad idea for three reasons:
+#
+# 1: `setup.py test` is supposed to find out whether the tests work in the
+# *current* environmentt, not whatever tox sets up.
+# 2: Empirically, trying to install tox during the test run wasn't working ("No
+# module named virtualenv").
+# 3: The tox documentation advises against it[1].
+#
+# Even further back in time, we used to use setuptools_trial [2]. That has its
+# own set of issues: for instance, it requires installation of Twisted to build
+# an sdist (because the recommended mode of usage is to add it to
+# `setup_requires`). That in turn means that in order to successfully run tox
+# you have to have the python header files installed for whichever version of
+# python tox uses (which is python3 on recent ubuntus, for example).
+#
+# So, for now at least, we stick with what appears to be the convention among
+# Twisted projects, and don't attempt to do anything when someone runs
+# `setup.py test`; instead we direct people to run `trial` directly if they
+# care.
+#
+# [1]: http://tox.readthedocs.io/en/2.5.0/example/basic.html#integration-with-setup-py-test-command
+# [2]: https://pypi.python.org/pypi/setuptools_trial
+class TestCommand(Command):
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ print ("""Synapse's tests cannot be run via setup.py. To run them, try:
+ PYTHONPATH="." trial tests
+""")
+
def read_file(path_segments):
"""Read a file from the package. Takes a list of strings to join to
make the path"""
@@ -39,38 +78,6 @@ def exec_file(path_segments):
return result
-class Tox(Command):
- user_options = [('tox-args=', 'a', "Arguments to pass to tox")]
-
- def initialize_options(self):
- self.tox_args = None
-
- def finalize_options(self):
- self.test_args = []
- self.test_suite = True
-
- def run(self):
- #import here, cause outside the eggs aren't loaded
- try:
- import tox
- except ImportError:
- try:
- self.distribution.fetch_build_eggs("tox")
- import tox
- except:
- raise RuntimeError(
- "The tests need 'tox' to run. Please install 'tox'."
- )
- import shlex
- args = self.tox_args
- if args:
- args = shlex.split(self.tox_args)
- else:
- args = []
- errno = tox.cmdline(args=args)
- sys.exit(errno)
-
-
version = exec_file(("synapse", "__init__.py"))["__version__"]
dependencies = exec_file(("synapse", "python_dependencies.py"))
long_description = read_file(("README.rst",))
@@ -86,5 +93,5 @@ setup(
zip_safe=False,
long_description=long_description,
scripts=["synctl"] + glob.glob("scripts/*"),
- cmdclass={'test': Tox},
+ cmdclass={'test': TestCommand},
)
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index ddab210718..a99986714d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -677,31 +677,28 @@ class Auth(object):
@defer.inlineCallbacks
def get_user_by_access_token(self, token, rights="access"):
- """ Get a registered user's ID.
+ """ Validate access token and get user_id from it
Args:
token (str): The access token to get the user by.
+ rights (str): The operation being performed; the access token must
+ allow this.
Returns:
dict : dict that includes the user and the ID of their access token.
Raises:
AuthError if no user by that token exists or the token is invalid.
"""
try:
- ret = yield self.get_user_from_macaroon(token, rights)
- except AuthError:
- # TODO(daniel): Remove this fallback when all existing access tokens
- # have been re-issued as macaroons.
- if self.hs.config.expire_access_token:
- raise
- ret = yield self._look_up_user_by_access_token(token)
-
- defer.returnValue(ret)
+ macaroon = pymacaroons.Macaroon.deserialize(token)
+ except Exception: # deserialize can throw more-or-less anything
+ # doesn't look like a macaroon: treat it as an opaque token which
+ # must be in the database.
+ # TODO: it would be nice to get rid of this, but apparently some
+ # people use access tokens which aren't macaroons
+ r = yield self._look_up_user_by_access_token(token)
+ defer.returnValue(r)
- @defer.inlineCallbacks
- def get_user_from_macaroon(self, macaroon_str, rights="access"):
try:
- macaroon = pymacaroons.Macaroon.deserialize(macaroon_str)
-
user_id = self.get_user_id_from_macaroon(macaroon)
user = UserID.from_string(user_id)
@@ -716,6 +713,30 @@ class Auth(object):
guest = True
if guest:
+ # Guest access tokens are not stored in the database (there can
+ # only be one access token per guest, anyway).
+ #
+ # In order to prevent guest access tokens being used as regular
+ # user access tokens (and hence getting around the invalidation
+ # process), we look up the user id and check that it is indeed
+ # a guest user.
+ #
+ # It would of course be much easier to store guest access
+ # tokens in the database as well, but that would break existing
+ # guest tokens.
+ stored_user = yield self.store.get_user_by_id(user_id)
+ if not stored_user:
+ raise AuthError(
+ self.TOKEN_NOT_FOUND_HTTP_STATUS,
+ "Unknown user_id %s" % user_id,
+ errcode=Codes.UNKNOWN_TOKEN
+ )
+ if not stored_user["is_guest"]:
+ raise AuthError(
+ self.TOKEN_NOT_FOUND_HTTP_STATUS,
+ "Guest access token used for regular user",
+ errcode=Codes.UNKNOWN_TOKEN
+ )
ret = {
"user": user,
"is_guest": True,
@@ -743,7 +764,7 @@ class Auth(object):
# macaroon. They probably should be.
# TODO: build the dictionary from the macaroon once the
# above are fixed
- ret = yield self._look_up_user_by_access_token(macaroon_str)
+ ret = yield self._look_up_user_by_access_token(token)
if ret["user"] != user:
logger.error(
"Macaroon user (%s) != DB user (%s)",
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index fd11935b40..b63a660c06 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -111,6 +111,11 @@ class E2eKeysHandler(object):
failures[destination] = {
"status": 503, "message": "Not ready for retry",
}
+ except Exception as e:
+ # include ConnectionRefused and other errors
+ failures[destination] = {
+ "status": 503, "message": e.message
+ }
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
@@ -222,6 +227,11 @@ class E2eKeysHandler(object):
failures[destination] = {
"status": 503, "message": "Not ready for retry",
}
+ except Exception as e:
+ # include ConnectionRefused and other errors
+ failures[destination] = {
+ "status": 503, "message": e.message
+ }
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fd09397226..7a57a69bd3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from synapse.push.action_generator import ActionGenerator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
-from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@@ -50,6 +50,10 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
+ # We arbitrarily limit concurrent event creation for a room to 5.
+ # This is to stop us from diverging history *too* much.
+ self.limiter = Limiter(max_count=5)
+
@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
event = yield self.store.get_event(event_id)
@@ -191,36 +195,38 @@ class MessageHandler(BaseHandler):
"""
builder = self.event_builder_factory.new(event_dict)
- self.validator.validate_new(builder)
-
- if builder.type == EventTypes.Member:
- membership = builder.content.get("membership", None)
- target = UserID.from_string(builder.state_key)
+ with (yield self.limiter.queue(builder.room_id)):
+ self.validator.validate_new(builder)
+
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ target = UserID.from_string(builder.state_key)
+
+ if membership in {Membership.JOIN, Membership.INVITE}:
+ # If event doesn't include a display name, add one.
+ profile = self.hs.get_handlers().profile_handler
+ content = builder.content
+
+ try:
+ content["displayname"] = yield profile.get_displayname(target)
+ content["avatar_url"] = yield profile.get_avatar_url(target)
+ except Exception as e:
+ logger.info(
+ "Failed to get profile information for %r: %s",
+ target, e
+ )
- if membership in {Membership.JOIN, Membership.INVITE}:
- # If event doesn't include a display name, add one.
- profile = self.hs.get_handlers().profile_handler
- content = builder.content
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
- try:
- content["displayname"] = yield profile.get_displayname(target)
- content["avatar_url"] = yield profile.get_avatar_url(target)
- except Exception as e:
- logger.info(
- "Failed to get profile information for %r: %s",
- target, e
- )
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
-
- if txn_id is not None:
- builder.internal_metadata.txn_id = txn_id
+ event, context = yield self._create_new_client_event(
+ builder=builder,
+ prev_event_ids=prev_event_ids,
+ )
- event, context = yield self._create_new_client_event(
- builder=builder,
- prev_event_ids=prev_event_ids,
- )
defer.returnValue((event, context))
@defer.inlineCallbacks
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 916e80a48e..50aa513935 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsHandler(BaseHandler):
if not res:
# res will be None if this read receipt is 'old'
- defer.returnValue(False)
+ continue
stream_id, max_persisted_id = res
@@ -109,6 +109,10 @@ class ReceiptsHandler(BaseHandler):
if max_batch_id is None or max_persisted_id > max_batch_id:
max_batch_id = max_persisted_id
+ if min_batch_id is None:
+ # no new receipts
+ defer.returnValue(False)
+
affected_room_ids = list(set([r["room_id"] for r in receipts]))
with PreserveLoggingContext():
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 886fec8701..286f0cef0a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -81,7 +81,7 @@ class RegistrationHandler(BaseHandler):
"User ID already taken.",
errcode=Codes.USER_IN_USE,
)
- user_data = yield self.auth.get_user_from_macaroon(guest_access_token)
+ user_data = yield self.auth.get_user_by_access_token(guest_access_token)
if not user_data["is_guest"] or user_data["user"].localpart != localpart:
raise AuthError(
403,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a86996689c..b62773dcbe 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,6 +510,7 @@ class SyncHandler(object):
Returns:
Deferred(SyncResult)
"""
+ logger.info("Calculating sync response for %r", sync_config.user)
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 054ca59ad2..acbd4bb5ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
+from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -294,14 +295,7 @@ class Notifier(object):
result = None
if timeout:
- # Will be set to a _NotificationListener that we'll be waiting on.
- # Allows us to cancel it.
- listener = None
-
- def timed_out():
- if listener:
- listener.deferred.cancel()
- timer = self.clock.call_later(timeout / 1000., timed_out)
+ end_time = self.clock.time_msec() + timeout
prev_token = from_token
while not result:
@@ -312,6 +306,10 @@ class Notifier(object):
if result:
break
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
# We need to supply the token we supplied to callback so
@@ -319,11 +317,14 @@ class Notifier(object):
prev_token = current_token
listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext():
- yield listener.deferred
+ yield self.clock.time_bound_deferred(
+ listener.deferred,
+ time_out=(end_time - now) / 1000.
+ )
+ except DeferredTimedOutError:
+ break
except defer.CancelledError:
break
-
- self.clock.cancel_call_later(timer, ignore_errs=True)
else:
current_token = user_stream.current_token
result = yield callback(from_token, current_token)
@@ -492,22 +493,27 @@ class Notifier(object):
"""
listener = _NotificationListener(None)
- def timed_out():
- listener.deferred.cancel()
+ end_time = self.clock.time_msec() + timeout
- timer = self.clock.call_later(timeout / 1000., timed_out)
while True:
listener.deferred = self.replication_deferred.observe()
result = yield callback()
if result:
break
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
try:
with PreserveLoggingContext():
- yield listener.deferred
+ yield self.clock.time_bound_deferred(
+ listener.deferred,
+ time_out=(end_time - now) / 1000.
+ )
+ except DeferredTimedOutError:
+ break
except defer.CancelledError:
break
- self.clock.cancel_call_later(timer, ignore_errs=True)
-
defer.returnValue(result)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index be55598c43..78b095c903 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -87,12 +87,12 @@ class BulkPushRuleEvaluator:
condition_cache = {}
for uid, rules in self.rules_by_user.items():
- display_name = None
- member_ev_id = context.current_state_ids.get((EventTypes.Member, uid))
- if member_ev_id:
- member_ev = yield self.store.get_event(member_ev_id, allow_none=True)
- if member_ev:
- display_name = member_ev.content.get("displayname", None)
+ display_name = room_members.get(uid, {}).get("display_name", None)
+ if not display_name:
+ # Handle the case where we are pushing a membership event to
+ # that user, as they might not be already joined.
+ if event.type == EventTypes.Member and event.state_key == uid:
+ display_name = event.content.get("displayname", None)
filtered = filtered_by_user[uid]
if len(filtered) == 0:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index b13095405b..eead435bfd 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -386,6 +386,24 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
}))
+class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")
+
+ def __init__(self, hs):
+ super(JoinedRoomMemberListRestServlet, self).__init__(hs)
+ self.state = hs.get_state_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ yield self.auth.get_user_by_req(request)
+
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+ defer.returnValue((200, {
+ "joined": users_with_profile
+ }))
+
+
# TODO: Needs better unit testing
class RoomMessageListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
@@ -709,6 +727,22 @@ class SearchRestServlet(ClientV1RestServlet):
defer.returnValue((200, results))
+class JoinedRoomsRestServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/joined_rooms$")
+
+ def __init__(self, hs):
+ super(JoinedRoomsRestServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+
+ rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
+ room_ids = set(r.room_id for r in rooms) # Ensure they're unique.
+ defer.returnValue((200, {"joined_rooms": list(room_ids)}))
+
+
def register_txn_path(servlet, regex_string, http_server, with_get=False):
"""Registers a transaction-based path.
@@ -744,6 +778,7 @@ def register_servlets(hs, http_server):
RoomStateEventRestServlet(hs).register(http_server)
RoomCreateRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server)
+ JoinedRoomMemberListRestServlet(hs).register(http_server)
RoomMessageListRestServlet(hs).register(http_server)
JoinRoomAliasServlet(hs).register(http_server)
RoomForgetRestServlet(hs).register(http_server)
@@ -755,4 +790,5 @@ def register_servlets(hs, http_server):
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
+ JoinedRoomsRestServlet(hs).register(http_server)
RoomEventContext(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 08b7c99d57..46789775b9 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -94,10 +94,6 @@ class KeyUploadServlet(RestServlet):
class KeyQueryServlet(RestServlet):
"""
- GET /keys/query/<user_id> HTTP/1.1
-
- GET /keys/query/<user_id>/<device_id> HTTP/1.1
-
POST /keys/query HTTP/1.1
Content-Type: application/json
{
@@ -131,11 +127,7 @@ class KeyQueryServlet(RestServlet):
"""
PATTERNS = client_v2_patterns(
- "/keys/query(?:"
- "/(?P<user_id>[^/]*)(?:"
- "/(?P<device_id>[^/]*)"
- ")?"
- ")?",
+ "/keys/query$",
releases=()
)
@@ -149,31 +141,16 @@ class KeyQueryServlet(RestServlet):
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@defer.inlineCallbacks
- def on_POST(self, request, user_id, device_id):
+ def on_POST(self, request):
yield self.auth.get_user_by_req(request, allow_guest=True)
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
result = yield self.e2e_keys_handler.query_devices(body, timeout)
defer.returnValue((200, result))
- @defer.inlineCallbacks
- def on_GET(self, request, user_id, device_id):
- requester = yield self.auth.get_user_by_req(request, allow_guest=True)
- timeout = parse_integer(request, "timeout", 10 * 1000)
- auth_user_id = requester.user.to_string()
- user_id = user_id if user_id else auth_user_id
- device_ids = [device_id] if device_id else []
- result = yield self.e2e_keys_handler.query_devices(
- {"device_keys": {user_id: device_ids}},
- timeout,
- )
- defer.returnValue((200, result))
-
class OneTimeKeyServlet(RestServlet):
"""
- GET /keys/claim/<user-id>/<device-id>/<algorithm> HTTP/1.1
-
POST /keys/claim HTTP/1.1
{
"one_time_keys": {
@@ -191,9 +168,7 @@ class OneTimeKeyServlet(RestServlet):
"""
PATTERNS = client_v2_patterns(
- "/keys/claim(?:/?|(?:/"
- "(?P<user_id>[^/]*)/(?P<device_id>[^/]*)/(?P<algorithm>[^/]*)"
- ")?)",
+ "/keys/claim$",
releases=()
)
@@ -203,17 +178,7 @@ class OneTimeKeyServlet(RestServlet):
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@defer.inlineCallbacks
- def on_GET(self, request, user_id, device_id, algorithm):
- yield self.auth.get_user_by_req(request, allow_guest=True)
- timeout = parse_integer(request, "timeout", 10 * 1000)
- result = yield self.e2e_keys_handler.claim_one_time_keys(
- {"one_time_keys": {user_id: {device_id: algorithm}}},
- timeout,
- )
- defer.returnValue((200, result))
-
- @defer.inlineCallbacks
- def on_POST(self, request, user_id, device_id, algorithm):
+ def on_POST(self, request):
yield self.auth.get_user_by_req(request, allow_guest=True)
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index db146ed348..fe936b3e62 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -222,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore,
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
+ self._min_stream_order_on_start = self.get_room_min_stream_ordering()
super(DataStore, self).__init__(hs)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 9747a04a9a..f72d15f5ed 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -405,7 +405,7 @@ class ReceiptsStore(SQLBaseStore):
room_id, receipt_type, user_id, event_ids, data
)
- max_persisted_id = self._stream_id_gen.get_current_token()
+ max_persisted_id = self._receipts_id_gen.get_current_token()
defer.returnValue((stream_id, max_persisted_id))
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 866d64e679..b2a45a38c1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.types import get_domain_from_id
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -34,7 +35,15 @@ RoomsForUser = namedtuple(
)
+_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
+
+
class RoomMemberStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(RoomMemberStore, self).__init__(hs)
+ self.register_background_update_handler(
+ _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
+ )
def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
@@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore):
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
+ "display_name": event.content.get("displayname", None),
+ "avatar_url": event.content.get("avatar_url", None),
}
for event in events
]
@@ -398,7 +409,7 @@ class RoomMemberStore(SQLBaseStore):
table="room_memberships",
column="event_id",
iterable=member_event_ids,
- retcols=['user_id'],
+ retcols=['user_id', 'display_name', 'avatar_url'],
keyvalues={
"membership": Membership.JOIN,
},
@@ -406,11 +417,21 @@ class RoomMemberStore(SQLBaseStore):
desc="_get_joined_users_from_context",
)
- users_in_room = set(row["user_id"] for row in rows)
+ users_in_room = {
+ row["user_id"]: {
+ "display_name": row["display_name"],
+ "avatar_url": row["avatar_url"],
+ }
+ for row in rows
+ }
+
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
if event.event_id in member_event_ids:
- users_in_room.add(event.state_key)
+ users_in_room[event.state_key] = {
+ "display_name": event.content.get("displayname", None),
+ "avatar_url": event.content.get("avatar_url", None),
+ }
defer.returnValue(users_in_room)
@@ -448,3 +469,78 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(True)
defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def _background_add_membership_profile(self, progress, batch_size):
+ target_min_stream_id = progress.get(
+ "target_min_stream_id_inclusive", self._min_stream_order_on_start
+ )
+ max_stream_id = progress.get(
+ "max_stream_id_exclusive", self._stream_order_on_start + 1
+ )
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def add_membership_profile_txn(txn):
+ sql = ("""
+ SELECT stream_ordering, event_id, room_id, content
+ FROM events
+ INNER JOIN room_memberships USING (room_id, event_id)
+ WHERE ? <= stream_ordering AND stream_ordering < ?
+ AND type = 'm.room.member'
+ ORDER BY stream_ordering DESC
+ LIMIT ?
+ """)
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1]["stream_ordering"]
+
+ to_update = []
+ for row in rows:
+ event_id = row["event_id"]
+ room_id = row["room_id"]
+ try:
+ content = json.loads(row["content"])
+ except:
+ continue
+
+ display_name = content.get("displayname", None)
+ avatar_url = content.get("avatar_url", None)
+
+ if display_name or avatar_url:
+ to_update.append((
+ display_name, avatar_url, event_id, room_id
+ ))
+
+ to_update_sql = ("""
+ UPDATE room_memberships SET display_name = ?, avatar_url = ?
+ WHERE event_id = ? AND room_id = ?
+ """)
+ for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
+ clump = to_update[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(to_update_sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ }
+
+ self._background_update_progress_txn(
+ txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
+ )
+
+ return len(to_update)
+
+ result = yield self.runInteraction(
+ _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
+ )
+
+ if not result:
+ yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
+
+ defer.returnValue(result)
diff --git a/synapse/storage/schema/delta/39/membership_profile.sql b/synapse/storage/schema/delta/39/membership_profile.sql
new file mode 100644
index 0000000000..1bf911c8ab
--- /dev/null
+++ b/synapse/storage/schema/delta/39/membership_profile.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE room_memberships ADD COLUMN display_name TEXT;
+ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT;
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('room_membership_profile_update', '{}');
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 7fa63b58a7..2dc24951c4 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
+
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
Args:
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c05b9450be..30fc480108 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -24,6 +24,11 @@ import logging
logger = logging.getLogger(__name__)
+class DeferredTimedOutError(SynapseError):
+ def __init__(self):
+ super(SynapseError).__init__(504, "Timed out")
+
+
def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
@@ -89,7 +94,7 @@ class Clock(object):
def timed_out_fn():
try:
- ret_deferred.errback(SynapseError(504, "Timed out"))
+ ret_deferred.errback(DeferredTimedOutError())
except:
pass
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 347fb1e380..16ed183d4c 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -197,6 +197,64 @@ class Linearizer(object):
defer.returnValue(_ctx_manager())
+class Limiter(object):
+ """Limits concurrent access to resources based on a key. Useful to ensure
+ only a few thing happen at a time on a given resource.
+
+ Example:
+
+ with (yield limiter.queue("test_key")):
+ # do some work.
+
+ """
+ def __init__(self, max_count):
+ """
+ Args:
+ max_count(int): The maximum number of concurrent access
+ """
+ self.max_count = max_count
+
+ # key_to_defer is a map from the key to a 2 element list where
+ # the first element is the number of things executing
+ # the second element is a list of deferreds for the things blocked from
+ # executing.
+ self.key_to_defer = {}
+
+ @defer.inlineCallbacks
+ def queue(self, key):
+ entry = self.key_to_defer.setdefault(key, [0, []])
+
+ # If the number of things executing is greater than the maximum
+ # then add a deferred to the list of blocked items
+ # When on of the things currently executing finishes it will callback
+ # this item so that it can continue executing.
+ if entry[0] >= self.max_count:
+ new_defer = defer.Deferred()
+ entry[1].append(new_defer)
+ with PreserveLoggingContext():
+ yield new_defer
+
+ entry[0] += 1
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ # We've finished executing so check if there are any things
+ # blocked waiting to execute and start one of them
+ entry[0] -= 1
+ try:
+ entry[1].pop(0).callback(None)
+ except IndexError:
+ # If nothing else is executing for this key then remove it
+ # from the map
+ if entry[0] == 0:
+ self.key_to_defer.pop(key, None)
+
+ defer.returnValue(_ctx_manager())
+
+
class ReadWriteLock(object):
"""A deferred style read write lock.
diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py
index 2cf262bb46..4575dd9834 100644
--- a/tests/api/test_auth.py
+++ b/tests/api/test_auth.py
@@ -12,17 +12,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from tests import unittest
-from twisted.internet import defer
+import pymacaroons
from mock import Mock
+from twisted.internet import defer
+import synapse.handlers.auth
from synapse.api.auth import Auth
from synapse.api.errors import AuthError
from synapse.types import UserID
+from tests import unittest
from tests.utils import setup_test_homeserver, mock_getRawHeaders
-import pymacaroons
+
+class TestHandlers(object):
+ def __init__(self, hs):
+ self.auth_handler = synapse.handlers.auth.AuthHandler(hs)
class AuthTestCase(unittest.TestCase):
@@ -34,14 +39,17 @@ class AuthTestCase(unittest.TestCase):
self.hs = yield setup_test_homeserver(handlers=None)
self.hs.get_datastore = Mock(return_value=self.store)
+ self.hs.handlers = TestHandlers(self.hs)
self.auth = Auth(self.hs)
self.test_user = "@foo:bar"
self.test_token = "_test_token_"
+ # this is overridden for the appservice tests
+ self.store.get_app_service_by_token = Mock(return_value=None)
+
@defer.inlineCallbacks
def test_get_user_by_req_user_valid_token(self):
- self.store.get_app_service_by_token = Mock(return_value=None)
user_info = {
"name": self.test_user,
"token_id": "ditto",
@@ -56,7 +64,6 @@ class AuthTestCase(unittest.TestCase):
self.assertEquals(requester.user.to_string(), self.test_user)
def test_get_user_by_req_user_bad_token(self):
- self.store.get_app_service_by_token = Mock(return_value=None)
self.store.get_user_by_access_token = Mock(return_value=None)
request = Mock(args={})
@@ -66,7 +73,6 @@ class AuthTestCase(unittest.TestCase):
self.failureResultOf(d, AuthError)
def test_get_user_by_req_user_missing_token(self):
- self.store.get_app_service_by_token = Mock(return_value=None)
user_info = {
"name": self.test_user,
"token_id": "ditto",
@@ -158,7 +164,7 @@ class AuthTestCase(unittest.TestCase):
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("type = access")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
- user_info = yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ user_info = yield self.auth.get_user_by_access_token(macaroon.serialize())
user = user_info["user"]
self.assertEqual(UserID.from_string(user_id), user)
@@ -168,6 +174,10 @@ class AuthTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_get_guest_user_from_macaroon(self):
+ self.store.get_user_by_id = Mock(return_value={
+ "is_guest": True,
+ })
+
user_id = "@baldrick:matrix.org"
macaroon = pymacaroons.Macaroon(
location=self.hs.config.server_name,
@@ -179,11 +189,12 @@ class AuthTestCase(unittest.TestCase):
macaroon.add_first_party_caveat("guest = true")
serialized = macaroon.serialize()
- user_info = yield self.auth.get_user_from_macaroon(serialized)
+ user_info = yield self.auth.get_user_by_access_token(serialized)
user = user_info["user"]
is_guest = user_info["is_guest"]
self.assertEqual(UserID.from_string(user_id), user)
self.assertTrue(is_guest)
+ self.store.get_user_by_id.assert_called_with(user_id)
@defer.inlineCallbacks
def test_get_user_from_macaroon_user_db_mismatch(self):
@@ -200,7 +211,7 @@ class AuthTestCase(unittest.TestCase):
macaroon.add_first_party_caveat("type = access")
macaroon.add_first_party_caveat("user_id = %s" % (user,))
with self.assertRaises(AuthError) as cm:
- yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ yield self.auth.get_user_by_access_token(macaroon.serialize())
self.assertEqual(401, cm.exception.code)
self.assertIn("User mismatch", cm.exception.msg)
@@ -220,7 +231,7 @@ class AuthTestCase(unittest.TestCase):
macaroon.add_first_party_caveat("type = access")
with self.assertRaises(AuthError) as cm:
- yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ yield self.auth.get_user_by_access_token(macaroon.serialize())
self.assertEqual(401, cm.exception.code)
self.assertIn("No user caveat", cm.exception.msg)
@@ -242,7 +253,7 @@ class AuthTestCase(unittest.TestCase):
macaroon.add_first_party_caveat("user_id = %s" % (user,))
with self.assertRaises(AuthError) as cm:
- yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ yield self.auth.get_user_by_access_token(macaroon.serialize())
self.assertEqual(401, cm.exception.code)
self.assertIn("Invalid macaroon", cm.exception.msg)
@@ -265,7 +276,7 @@ class AuthTestCase(unittest.TestCase):
macaroon.add_first_party_caveat("cunning > fox")
with self.assertRaises(AuthError) as cm:
- yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ yield self.auth.get_user_by_access_token(macaroon.serialize())
self.assertEqual(401, cm.exception.code)
self.assertIn("Invalid macaroon", cm.exception.msg)
@@ -293,12 +304,12 @@ class AuthTestCase(unittest.TestCase):
self.hs.clock.now = 5000 # seconds
self.hs.config.expire_access_token = True
- # yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ # yield self.auth.get_user_by_access_token(macaroon.serialize())
# TODO(daniel): Turn on the check that we validate expiration, when we
# validate expiration (and remove the above line, which will start
# throwing).
with self.assertRaises(AuthError) as cm:
- yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ yield self.auth.get_user_by_access_token(macaroon.serialize())
self.assertEqual(401, cm.exception.code)
self.assertIn("Invalid macaroon", cm.exception.msg)
@@ -327,6 +338,58 @@ class AuthTestCase(unittest.TestCase):
self.hs.clock.now = 5000 # seconds
self.hs.config.expire_access_token = True
- user_info = yield self.auth.get_user_from_macaroon(macaroon.serialize())
+ user_info = yield self.auth.get_user_by_access_token(macaroon.serialize())
user = user_info["user"]
self.assertEqual(UserID.from_string(user_id), user)
+
+ @defer.inlineCallbacks
+ def test_cannot_use_regular_token_as_guest(self):
+ USER_ID = "@percy:matrix.org"
+ self.store.add_access_token_to_user = Mock()
+
+ token = yield self.hs.handlers.auth_handler.issue_access_token(
+ USER_ID, "DEVICE"
+ )
+ self.store.add_access_token_to_user.assert_called_with(
+ USER_ID, token, "DEVICE"
+ )
+
+ def get_user(tok):
+ if token != tok:
+ return None
+ return {
+ "name": USER_ID,
+ "is_guest": False,
+ "token_id": 1234,
+ "device_id": "DEVICE",
+ }
+ self.store.get_user_by_access_token = get_user
+ self.store.get_user_by_id = Mock(return_value={
+ "is_guest": False,
+ })
+
+ # check the token works
+ request = Mock(args={})
+ request.args["access_token"] = [token]
+ request.requestHeaders.getRawHeaders = mock_getRawHeaders()
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+ self.assertEqual(UserID.from_string(USER_ID), requester.user)
+ self.assertFalse(requester.is_guest)
+
+ # add an is_guest caveat
+ mac = pymacaroons.Macaroon.deserialize(token)
+ mac.add_first_party_caveat("guest = true")
+ guest_tok = mac.serialize()
+
+ # the token should *not* work now
+ request = Mock(args={})
+ request.args["access_token"] = [guest_tok]
+ request.requestHeaders.getRawHeaders = mock_getRawHeaders()
+
+ with self.assertRaises(AuthError) as cm:
+ yield self.auth.get_user_by_req(request, allow_guest=True)
+
+ self.assertEqual(401, cm.exception.code)
+ self.assertEqual("Guest access token used for regular user", cm.exception.msg)
+
+ self.store.get_user_by_id.assert_called_with(USER_ID)
diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py
new file mode 100644
index 0000000000..9c795d9fdb
--- /dev/null
+++ b/tests/util/test_limiter.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from tests import unittest
+
+from twisted.internet import defer
+
+from synapse.util.async import Limiter
+
+
+class LimiterTestCase(unittest.TestCase):
+
+ @defer.inlineCallbacks
+ def test_limiter(self):
+ limiter = Limiter(3)
+
+ key = object()
+
+ d1 = limiter.queue(key)
+ cm1 = yield d1
+
+ d2 = limiter.queue(key)
+ cm2 = yield d2
+
+ d3 = limiter.queue(key)
+ cm3 = yield d3
+
+ d4 = limiter.queue(key)
+ self.assertFalse(d4.called)
+
+ d5 = limiter.queue(key)
+ self.assertFalse(d5.called)
+
+ with cm1:
+ self.assertFalse(d4.called)
+ self.assertFalse(d5.called)
+
+ self.assertTrue(d4.called)
+ self.assertFalse(d5.called)
+
+ with cm3:
+ self.assertFalse(d5.called)
+
+ self.assertTrue(d5.called)
+
+ with cm2:
+ pass
+
+ with (yield d4):
+ pass
+
+ with (yield d5):
+ pass
+
+ d6 = limiter.queue(key)
+ with (yield d6):
+ pass
diff --git a/tests/utils.py b/tests/utils.py
index 2d0bd205fd..d3d6c8021d 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -294,6 +294,10 @@ class MockClock(object):
def advance_time_msec(self, ms):
self.advance_time(ms / 1000.)
+ def time_bound_deferred(self, d, *args, **kwargs):
+ # We don't bother timing things out for now.
+ return d
+
class SQLiteMemoryDbPool(ConnectionPool, object):
def __init__(self):
|