diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index ca23c9c460..489efb7f86 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -44,6 +45,7 @@ class JoinRules(object):
class LoginType(object):
PASSWORD = u"m.login.password"
EMAIL_IDENTITY = u"m.login.email.identity"
+ MSISDN = u"m.login.msisdn"
RECAPTCHA = u"m.login.recaptcha"
DUMMY = u"m.login.dummy"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 921c457738..6fbd5d6876 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -15,6 +15,7 @@
"""Contains exceptions and error codes."""
+import json
import logging
logger = logging.getLogger(__name__)
@@ -50,27 +51,35 @@ class Codes(object):
class CodeMessageException(RuntimeError):
- """An exception with integer code and message string attributes."""
+ """An exception with integer code and message string attributes.
+ Attributes:
+ code (int): HTTP error code
+ msg (str): string describing the error
+ """
def __init__(self, code, msg):
super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
self.code = code
self.msg = msg
- self.response_code_message = None
def error_dict(self):
return cs_error(self.msg)
class SynapseError(CodeMessageException):
- """A base error which can be caught for all synapse events."""
+ """A base exception type for matrix errors which have an errcode and error
+ message (as well as an HTTP status code).
+
+ Attributes:
+ errcode (str): Matrix error code e.g 'M_FORBIDDEN'
+ """
def __init__(self, code, msg, errcode=Codes.UNKNOWN):
"""Constructs a synapse error.
Args:
code (int): The integer error code (an HTTP response code)
msg (str): The human-readable error message.
- err (str): The error code e.g 'M_FORBIDDEN'
+ errcode (str): The matrix error code e.g 'M_FORBIDDEN'
"""
super(SynapseError, self).__init__(code, msg)
self.errcode = errcode
@@ -81,6 +90,39 @@ class SynapseError(CodeMessageException):
self.errcode,
)
+ @classmethod
+ def from_http_response_exception(cls, err):
+ """Make a SynapseError based on an HTTPResponseException
+
+ This is useful when a proxied request has failed, and we need to
+ decide how to map the failure onto a matrix error to send back to the
+ client.
+
+ An attempt is made to parse the body of the http response as a matrix
+ error. If that succeeds, the errcode and error message from the body
+ are used as the errcode and error message in the new synapse error.
+
+ Otherwise, the errcode is set to M_UNKNOWN, and the error message is
+ set to the reason code from the HTTP response.
+
+ Args:
+ err (HttpResponseException):
+
+ Returns:
+ SynapseError:
+ """
+ # try to parse the body as json, to get better errcode/msg, but
+ # default to M_UNKNOWN with the HTTP status as the error text
+ try:
+ j = json.loads(err.response)
+ except ValueError:
+ j = {}
+ errcode = j.get('errcode', Codes.UNKNOWN)
+ errmsg = j.get('error', err.msg)
+
+ res = SynapseError(err.code, errmsg, errcode)
+ return res
+
class RegistrationError(SynapseError):
"""An error raised when a registration event fails."""
@@ -106,13 +148,11 @@ class UnrecognizedRequestError(SynapseError):
class NotFoundError(SynapseError):
"""An error indicating we can't find the thing you asked for"""
- def __init__(self, *args, **kwargs):
- if "errcode" not in kwargs:
- kwargs["errcode"] = Codes.NOT_FOUND
+ def __init__(self, msg="Not found", errcode=Codes.NOT_FOUND):
super(NotFoundError, self).__init__(
404,
- "Not found",
- **kwargs
+ msg,
+ errcode=errcode
)
@@ -173,7 +213,6 @@ class LimitExceededError(SynapseError):
errcode=Codes.LIMIT_EXCEEDED):
super(LimitExceededError, self).__init__(code, msg, errcode)
self.retry_after_ms = retry_after_ms
- self.response_code_message = "Too Many Requests"
def error_dict(self):
return cs_error(
@@ -243,6 +282,19 @@ class FederationError(RuntimeError):
class HttpResponseException(CodeMessageException):
+ """
+ Represents an HTTP-level failure of an outbound request
+
+ Attributes:
+ response (str): body of response
+ """
def __init__(self, code, msg, response):
- self.response = response
+ """
+
+ Args:
+ code (int): HTTP status code
+ msg (str): reason phrase from HTTP response status line
+ response (str): body of response
+ """
super(HttpResponseException, self).__init__(code, msg)
+ self.response = response
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 90235ff098..c802dd67a3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -99,7 +99,12 @@ class TransactionQueue(object):
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}
+ # destination -> stream_id of last successfully sent to-device message.
+ # NB: may be a long or an int.
self.last_device_stream_id_by_dest = {}
+
+ # destination -> stream_id of last successfully sent device list
+ # update.
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index fffba34383..e7a1bb7246 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -47,6 +48,7 @@ class AuthHandler(BaseHandler):
LoginType.PASSWORD: self._check_password_auth,
LoginType.RECAPTCHA: self._check_recaptcha,
LoginType.EMAIL_IDENTITY: self._check_email_identity,
+ LoginType.MSISDN: self._check_msisdn,
LoginType.DUMMY: self._check_dummy_auth,
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -307,31 +309,47 @@ class AuthHandler(BaseHandler):
defer.returnValue(True)
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
- @defer.inlineCallbacks
def _check_email_identity(self, authdict, _):
+ return self._check_threepid('email', authdict)
+
+ def _check_msisdn(self, authdict, _):
+ return self._check_threepid('msisdn', authdict)
+
+ @defer.inlineCallbacks
+ def _check_dummy_auth(self, authdict, _):
+ yield run_on_reactor()
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _check_threepid(self, medium, authdict):
yield run_on_reactor()
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
threepid_creds = authdict['threepid_creds']
+
identity_handler = self.hs.get_handlers().identity_handler
- logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+ logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
threepid = yield identity_handler.threepid_from_creds(threepid_creds)
if not threepid:
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+ if threepid['medium'] != medium:
+ raise LoginError(
+ 401,
+ "Expecting threepid of type '%s', got '%s'" % (
+ medium, threepid['medium'],
+ ),
+ errcode=Codes.UNAUTHORIZED
+ )
+
threepid['threepid_creds'] = authdict['threepid_creds']
defer.returnValue(threepid)
- @defer.inlineCallbacks
- def _check_dummy_auth(self, authdict, _):
- yield run_on_reactor()
- defer.returnValue(True)
-
def _get_params_recaptcha(self):
return {"public_key": self.hs.config.recaptcha_public_key}
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index efaa0c8d6e..1b007d4945 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -296,7 +296,7 @@ class DeviceHandler(BaseHandler):
# ordering: treat it the same as a new room
event_ids = []
- current_state_ids = yield self.state.get_current_state_ids(room_id)
+ current_state_ids = yield self.store.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members
# in the changed list
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 559e5d5a71..6a53c5eb47 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler):
params.update(kwargs)
try:
- data = yield self.http_client.post_urlencoded_get_json(
+ data = yield self.http_client.post_json_get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/validate/email/requestToken"
@@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler):
except CodeMessageException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e
+
+ @defer.inlineCallbacks
+ def requestMsisdnToken(
+ self, id_server, country, phone_number,
+ client_secret, send_attempt, **kwargs
+ ):
+ yield run_on_reactor()
+
+ if not self._should_trust_id_server(id_server):
+ raise SynapseError(
+ 400, "Untrusted ID server '%s'" % id_server,
+ Codes.SERVER_NOT_TRUSTED
+ )
+
+ params = {
+ 'country': country,
+ 'phone_number': phone_number,
+ 'client_secret': client_secret,
+ 'send_attempt': send_attempt,
+ }
+ params.update(kwargs)
+
+ try:
+ data = yield self.http_client.post_json_get_json(
+ "https://%s%s" % (
+ id_server,
+ "/_matrix/identity/api/v1/validate/msisdn/requestToken"
+ ),
+ params
+ )
+ defer.returnValue(data)
+ except CodeMessageException as e:
+ logger.info("Proxied requestToken failed: %r", e)
+ raise e
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 19eebbd43f..516cd9a6ac 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -21,6 +21,7 @@ from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.async import concurrently_execute
+from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
+ logger.info(
+ "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
+ limit, since_token, bool(search_filter), network_tuple,
+ )
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
rooms_to_order_value = {}
rooms_to_num_joined = {}
- rooms_to_latest_event_ids = {}
newly_visible = []
newly_unpublished = []
@@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_order_for_room(room_id):
- latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
- if not latest_event_ids:
+ # Most of the rooms won't have changed between the since token and
+ # now (especially if the since token is "now"). So, we can ask what
+ # the current users are in a room (that will hit a cache) and then
+ # check if the room has changed since the since token. (We have to
+ # do it in that order to avoid races).
+ # If things have changed then fall back to getting the current state
+ # at the since token.
+ joined_users = yield self.store.get_users_in_room(room_id)
+ if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token
)
- rooms_to_latest_event_ids[room_id] = latest_event_ids
- if not latest_event_ids:
- return
+ if not latest_event_ids:
+ return
+
+ joined_users = yield self.state_handler.get_current_user_in_room(
+ room_id, latest_event_ids,
+ )
- joined_users = yield self.state_handler.get_current_user_in_room(
- room_id, latest_event_ids,
- )
num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users
@@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse()
- # Actually generate the entries. _generate_room_entry will append to
+ # Actually generate the entries. _append_room_entry_to_chunk will append to
# chunk but will stop if len(chunk) > limit
chunk = []
if limit and not search_filter:
step = limit + 1
for i in xrange(0, len(rooms_to_scan), step):
# We iterate here because the vast majority of cases we'll stop
- # at first iteration, but occaisonally _generate_room_entry
+ # at first iteration, but occaisonally _append_room_entry_to_chunk
# won't append to the chunk and so we need to loop again.
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
yield concurrently_execute(
- lambda r: self._generate_room_entry(
+ lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
@@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
break
else:
yield concurrently_execute(
- lambda r: self._generate_room_entry(
+ lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
@@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results)
@defer.inlineCallbacks
- def _generate_room_entry(self, room_id, num_joined_users, chunk, limit,
- search_filter):
+ def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
+ search_filter):
+ """Generate the entry for a room in the public room list and append it
+ to the `chunk` if it matches the search filter
+ """
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return
+ result = yield self._generate_room_entry(room_id, num_joined_users)
+
+ if result and _matches_room_entry(result, search_filter):
+ chunk.append(result)
+
+ @cachedInlineCallbacks(num_args=1, cache_context=True)
+ def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+ """Returns the entry for a room
+ """
result = {
"room_id": room_id,
"num_joined_members": num_joined_users,
}
- current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+ current_state_ids = yield self.store.get_current_state_ids(
+ room_id, on_invalidate=cache_context.invalidate,
+ )
event_map = yield self.store.get_events([
- event_id for key, event_id in current_state_ids.items()
+ event_id for key, event_id in current_state_ids.iteritems()
if key[0] in (
EventTypes.JoinRules,
EventTypes.Name,
@@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
- aliases = yield self.store.get_aliases_for_room(room_id)
+ aliases = yield self.store.get_aliases_for_room(
+ room_id, on_invalidate=cache_context.invalidate
+ )
if aliases:
result["aliases"] = aliases
@@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- if _matches_room_entry(result, search_filter):
- chunk.append(result)
+ defer.returnValue(result)
@defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 78b92cef36..82586e3dea 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -108,6 +108,12 @@ class MatrixFederationHttpClient(object):
query_bytes=b"", retry_on_dns_fail=True,
timeout=None, long_retries=False):
""" Creates and sends a request to the given url
+
+ Returns:
+ Deferred: resolves with the http response object on success.
+
+ Fails with ``HTTPRequestException``: if we get an HTTP response
+ code >= 300.
"""
headers_dict[b"User-Agent"] = [self.version_string]
headers_dict[b"Host"] = [destination]
@@ -408,8 +414,11 @@ class MatrixFederationHttpClient(object):
output_stream (file): File to write the response body to.
args (dict): Optional dictionary used to create the query string.
Returns:
- A (int,dict) tuple of the file length and a dict of the response
- headers.
+ Deferred: resolves with an (int,dict) tuple of the file length and
+ a dict of the response headers.
+
+ Fails with ``HTTPRequestException`` if we get an HTTP response code
+ >= 300
"""
encoded_args = {}
@@ -419,7 +428,7 @@ class MatrixFederationHttpClient(object):
encoded_args[k] = [v.encode("UTF-8") for v in vs]
query_bytes = urllib.urlencode(encoded_args, True)
- logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
+ logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
def body_callback(method, url_bytes, headers_dict):
self.sign_request(destination, method, url_bytes, headers_dict)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 8c22d6f00f..9a4c36ad5d 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -192,6 +192,16 @@ def parse_json_object_from_request(request):
return content
+def assert_params_in_request(body, required):
+ absent = []
+ for k in required:
+ if k not in body:
+ absent.append(k)
+
+ if len(absent) > 0:
+ raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+
+
class RestServlet(object):
""" A Synapse REST Servlet.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 8051a7a842..2657dcd8dc 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -37,6 +37,10 @@ metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = metrics.register_counter("notified_events")
+users_woken_by_stream_counter = metrics.register_counter(
+ "users_woken_by_stream", labels=["stream"]
+)
+
# TODO(paul): Should be shared somewhere
def count(func, l):
@@ -73,6 +77,13 @@ class _NotifierUserStream(object):
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
+
+ # The last token for which we should wake up any streams that have a
+ # token that comes before it. This gets updated everytime we get poked.
+ # We start it at the current token since if we get any streams
+ # that have a token from before we have no idea whether they should be
+ # woken up or not, so lets just wake them up.
+ self.last_notified_token = current_token
self.last_notified_ms = time_now_ms
with PreserveLoggingContext():
@@ -89,9 +100,12 @@ class _NotifierUserStream(object):
self.current_token = self.current_token.copy_and_advance(
stream_key, stream_id
)
+ self.last_notified_token = self.current_token
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
+ users_woken_by_stream_counter.inc(stream_key)
+
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
noify_deferred.callback(self.current_token)
@@ -113,8 +127,14 @@ class _NotifierUserStream(object):
def new_listener(self, token):
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
+
+ Args:
+ token: The token from which we are streaming from, i.e. we shouldn't
+ notify for things that happened before this.
"""
- if self.current_token.is_after(token):
+ # Immediately wake up stream if something has already since happened
+ # since their last token.
+ if self.last_notified_token.is_after(token):
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
@@ -294,40 +314,44 @@ class Notifier(object):
self._register_with_keys(user_stream)
result = None
+ prev_token = from_token
if timeout:
end_time = self.clock.time_msec() + timeout
- prev_token = from_token
while not result:
try:
- current_token = user_stream.current_token
-
- result = yield callback(prev_token, current_token)
- if result:
- break
-
now = self.clock.time_msec()
if end_time <= now:
break
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
- # We need to supply the token we supplied to callback so
- # that we don't miss any current_token updates.
- prev_token = current_token
listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext():
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
+
+ current_token = user_stream.current_token
+
+ result = yield callback(prev_token, current_token)
+ if result:
+ break
+
+ # Update the prev_token to the current_token since nothing
+ # has happened between the old prev_token and the current_token
+ prev_token = current_token
except DeferredTimedOutError:
break
except defer.CancelledError:
break
- else:
+
+ if result is None:
+ # This happened if there was no timeout or if the timeout had
+ # already expired.
current_token = user_stream.current_token
- result = yield callback(from_token, current_token)
+ result = yield callback(prev_token, current_token)
defer.returnValue(result)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 62d794f22b..3a50c72e0b 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -139,7 +139,7 @@ class Mailer(object):
@defer.inlineCallbacks
def _fetch_room_state(room_id):
- room_state = yield self.state_handler.get_current_state_ids(room_id)
+ room_state = yield self.store.get_current_state_ids(room_id)
state_by_room[room_id] = room_state
# Run at most 3 of these at once: sync does 10 at a time but email
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 7817b0cd91..c4777b2a2b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -37,6 +38,7 @@ REQUIREMENTS = {
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
+ "phonenumbers>=8.2.0": ["phonenumbers"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index 24b5c79d4a..9d1d173b2f 100644
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -27,4 +27,9 @@ class SlavedIdTracker(object):
self._current = (max if self.step > 0 else min)(self._current, new_id)
def get_current_token(self):
+ """
+
+ Returns:
+ int
+ """
return self._current
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 622b2d8540..518c9ea2e9 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore):
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
+ get_current_state_ids = (
+ StateStore.__dict__["get_current_state_ids"]
+ )
+ has_room_changed_since = DataStore.has_room_changed_since.__func__
get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 34eebe6fd8..a43410fb37 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, LoginError, Codes
from synapse.types import UserID
from synapse.http.server import finish_request
from synapse.http.servlet import parse_json_object_from_request
+from synapse.util.msisdn import phone_number_to_msisdn
from .base import ClientV1RestServlet, client_path_patterns
@@ -39,6 +40,49 @@ from twisted.web.client import PartialDownloadError
logger = logging.getLogger(__name__)
+def login_submission_legacy_convert(submission):
+ """
+ If the input login submission is an old style object
+ (ie. with top-level user / medium / address) convert it
+ to a typed object.
+ """
+ if "user" in submission:
+ submission["identifier"] = {
+ "type": "m.id.user",
+ "user": submission["user"],
+ }
+ del submission["user"]
+
+ if "medium" in submission and "address" in submission:
+ submission["identifier"] = {
+ "type": "m.id.thirdparty",
+ "medium": submission["medium"],
+ "address": submission["address"],
+ }
+ del submission["medium"]
+ del submission["address"]
+
+
+def login_id_thirdparty_from_phone(identifier):
+ """
+ Convert a phone login identifier type to a generic threepid identifier
+ Args:
+ identifier(dict): Login identifier dict of type 'm.id.phone'
+
+ Returns: Login identifier dict of type 'm.id.threepid'
+ """
+ if "country" not in identifier or "number" not in identifier:
+ raise SynapseError(400, "Invalid phone-type identifier")
+
+ msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"])
+
+ return {
+ "type": "m.id.thirdparty",
+ "medium": "msisdn",
+ "address": msisdn,
+ }
+
+
class LoginRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/login$")
PASS_TYPE = "m.login.password"
@@ -119,20 +163,52 @@ class LoginRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def do_password_login(self, login_submission):
- if 'medium' in login_submission and 'address' in login_submission:
- address = login_submission['address']
- if login_submission['medium'] == 'email':
+ if "password" not in login_submission:
+ raise SynapseError(400, "Missing parameter: password")
+
+ login_submission_legacy_convert(login_submission)
+
+ if "identifier" not in login_submission:
+ raise SynapseError(400, "Missing param: identifier")
+
+ identifier = login_submission["identifier"]
+ if "type" not in identifier:
+ raise SynapseError(400, "Login identifier has no type")
+
+ # convert phone type identifiers to generic threepids
+ if identifier["type"] == "m.id.phone":
+ identifier = login_id_thirdparty_from_phone(identifier)
+
+ # convert threepid identifiers to user IDs
+ if identifier["type"] == "m.id.thirdparty":
+ if 'medium' not in identifier or 'address' not in identifier:
+ raise SynapseError(400, "Invalid thirdparty identifier")
+
+ address = identifier['address']
+ if identifier['medium'] == 'email':
# For emails, transform the address to lowercase.
# We store all email addreses as lowercase in the DB.
# (See add_threepid in synapse/handlers/auth.py)
address = address.lower()
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
- login_submission['medium'], address
+ identifier['medium'], address
)
if not user_id:
raise LoginError(403, "", errcode=Codes.FORBIDDEN)
- else:
- user_id = login_submission['user']
+
+ identifier = {
+ "type": "m.id.user",
+ "user": user_id,
+ }
+
+ # by this point, the identifier should be an m.id.user: if it's anything
+ # else, we haven't understood it.
+ if identifier["type"] != "m.id.user":
+ raise SynapseError(400, "Unknown login identifier type")
+ if "user" not in identifier:
+ raise SynapseError(400, "User identifier is missing 'user' key")
+
+ user_id = identifier["user"]
if not user_id.startswith('@'):
user_id = UserID.create(
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 398e7f5eb0..aac76edf1c 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,8 +18,11 @@ from twisted.internet import defer
from synapse.api.constants import LoginType
from synapse.api.errors import LoginError, SynapseError, Codes
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+ RestServlet, parse_json_object_from_request, assert_params_in_request
+)
from synapse.util.async import run_on_reactor
+from synapse.util.msisdn import phone_number_to_msisdn
from ._base import client_v2_patterns
@@ -28,11 +32,11 @@ import logging
logger = logging.getLogger(__name__)
-class PasswordRequestTokenRestServlet(RestServlet):
+class EmailPasswordRequestTokenRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/account/password/email/requestToken$")
def __init__(self, hs):
- super(PasswordRequestTokenRestServlet, self).__init__()
+ super(EmailPasswordRequestTokenRestServlet, self).__init__()
self.hs = hs
self.identity_handler = hs.get_handlers().identity_handler
@@ -40,14 +44,9 @@ class PasswordRequestTokenRestServlet(RestServlet):
def on_POST(self, request):
body = parse_json_object_from_request(request)
- required = ['id_server', 'client_secret', 'email', 'send_attempt']
- absent = []
- for k in required:
- if k not in body:
- absent.append(k)
-
- if absent:
- raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+ assert_params_in_request(body, [
+ 'id_server', 'client_secret', 'email', 'send_attempt'
+ ])
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
'email', body['email']
@@ -60,6 +59,37 @@ class PasswordRequestTokenRestServlet(RestServlet):
defer.returnValue((200, ret))
+class MsisdnPasswordRequestTokenRestServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/account/password/msisdn/requestToken$")
+
+ def __init__(self, hs):
+ super(MsisdnPasswordRequestTokenRestServlet, self).__init__()
+ self.hs = hs
+ self.datastore = self.hs.get_datastore()
+ self.identity_handler = hs.get_handlers().identity_handler
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ body = parse_json_object_from_request(request)
+
+ assert_params_in_request(body, [
+ 'id_server', 'client_secret',
+ 'country', 'phone_number', 'send_attempt',
+ ])
+
+ msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+ existingUid = yield self.datastore.get_user_id_by_threepid(
+ 'msisdn', msisdn
+ )
+
+ if existingUid is None:
+ raise SynapseError(400, "MSISDN not found", Codes.THREEPID_NOT_FOUND)
+
+ ret = yield self.identity_handler.requestMsisdnToken(**body)
+ defer.returnValue((200, ret))
+
+
class PasswordRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/account/password$")
@@ -68,6 +98,7 @@ class PasswordRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
+ self.datastore = self.hs.get_datastore()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -77,7 +108,8 @@ class PasswordRestServlet(RestServlet):
authed, result, params, _ = yield self.auth_handler.check_auth([
[LoginType.PASSWORD],
- [LoginType.EMAIL_IDENTITY]
+ [LoginType.EMAIL_IDENTITY],
+ [LoginType.MSISDN],
], body, self.hs.get_ip_from_request(request))
if not authed:
@@ -102,7 +134,7 @@ class PasswordRestServlet(RestServlet):
# (See add_threepid in synapse/handlers/auth.py)
threepid['address'] = threepid['address'].lower()
# if using email, we must know about the email they're authing with!
- threepid_user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
+ threepid_user_id = yield self.datastore.get_user_id_by_threepid(
threepid['medium'], threepid['address']
)
if not threepid_user_id:
@@ -169,13 +201,14 @@ class DeactivateAccountRestServlet(RestServlet):
defer.returnValue((200, {}))
-class ThreepidRequestTokenRestServlet(RestServlet):
+class EmailThreepidRequestTokenRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/account/3pid/email/requestToken$")
def __init__(self, hs):
self.hs = hs
- super(ThreepidRequestTokenRestServlet, self).__init__()
+ super(EmailThreepidRequestTokenRestServlet, self).__init__()
self.identity_handler = hs.get_handlers().identity_handler
+ self.datastore = self.hs.get_datastore()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -190,7 +223,7 @@ class ThreepidRequestTokenRestServlet(RestServlet):
if absent:
raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
- existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
+ existingUid = yield self.datastore.get_user_id_by_threepid(
'email', body['email']
)
@@ -201,6 +234,44 @@ class ThreepidRequestTokenRestServlet(RestServlet):
defer.returnValue((200, ret))
+class MsisdnThreepidRequestTokenRestServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/account/3pid/msisdn/requestToken$")
+
+ def __init__(self, hs):
+ self.hs = hs
+ super(MsisdnThreepidRequestTokenRestServlet, self).__init__()
+ self.identity_handler = hs.get_handlers().identity_handler
+ self.datastore = self.hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ body = parse_json_object_from_request(request)
+
+ required = [
+ 'id_server', 'client_secret',
+ 'country', 'phone_number', 'send_attempt',
+ ]
+ absent = []
+ for k in required:
+ if k not in body:
+ absent.append(k)
+
+ if absent:
+ raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+
+ msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+ existingUid = yield self.datastore.get_user_id_by_threepid(
+ 'msisdn', msisdn
+ )
+
+ if existingUid is not None:
+ raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE)
+
+ ret = yield self.identity_handler.requestEmailToken(**body)
+ defer.returnValue((200, ret))
+
+
class ThreepidRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/account/3pid$")
@@ -210,6 +281,7 @@ class ThreepidRestServlet(RestServlet):
self.identity_handler = hs.get_handlers().identity_handler
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
+ self.datastore = self.hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request):
@@ -217,7 +289,7 @@ class ThreepidRestServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request)
- threepids = yield self.hs.get_datastore().user_get_threepids(
+ threepids = yield self.datastore.user_get_threepids(
requester.user.to_string()
)
@@ -258,7 +330,7 @@ class ThreepidRestServlet(RestServlet):
if 'bind' in body and body['bind']:
logger.debug(
- "Binding emails %s to %s",
+ "Binding threepid %s to %s",
threepid, user_id
)
yield self.identity_handler.bind_threepid(
@@ -302,9 +374,11 @@ class ThreepidDeleteRestServlet(RestServlet):
def register_servlets(hs, http_server):
- PasswordRequestTokenRestServlet(hs).register(http_server)
+ EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+ MsisdnPasswordRequestTokenRestServlet(hs).register(http_server)
PasswordRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
- ThreepidRequestTokenRestServlet(hs).register(http_server)
+ EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+ MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index ccca5a12d5..dcd13b876f 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +20,10 @@ import synapse
from synapse.api.auth import get_access_token_from_request, has_access_token
from synapse.api.constants import LoginType
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+ RestServlet, parse_json_object_from_request, assert_params_in_request
+)
+from synapse.util.msisdn import phone_number_to_msisdn
from ._base import client_v2_patterns
@@ -43,7 +47,7 @@ else:
logger = logging.getLogger(__name__)
-class RegisterRequestTokenRestServlet(RestServlet):
+class EmailRegisterRequestTokenRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/register/email/requestToken$")
def __init__(self, hs):
@@ -51,7 +55,7 @@ class RegisterRequestTokenRestServlet(RestServlet):
Args:
hs (synapse.server.HomeServer): server
"""
- super(RegisterRequestTokenRestServlet, self).__init__()
+ super(EmailRegisterRequestTokenRestServlet, self).__init__()
self.hs = hs
self.identity_handler = hs.get_handlers().identity_handler
@@ -59,14 +63,9 @@ class RegisterRequestTokenRestServlet(RestServlet):
def on_POST(self, request):
body = parse_json_object_from_request(request)
- required = ['id_server', 'client_secret', 'email', 'send_attempt']
- absent = []
- for k in required:
- if k not in body:
- absent.append(k)
-
- if len(absent) > 0:
- raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+ assert_params_in_request(body, [
+ 'id_server', 'client_secret', 'email', 'send_attempt'
+ ])
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
'email', body['email']
@@ -79,6 +78,43 @@ class RegisterRequestTokenRestServlet(RestServlet):
defer.returnValue((200, ret))
+class MsisdnRegisterRequestTokenRestServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/register/msisdn/requestToken$")
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(MsisdnRegisterRequestTokenRestServlet, self).__init__()
+ self.hs = hs
+ self.identity_handler = hs.get_handlers().identity_handler
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ body = parse_json_object_from_request(request)
+
+ assert_params_in_request(body, [
+ 'id_server', 'client_secret',
+ 'country', 'phone_number',
+ 'send_attempt',
+ ])
+
+ msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+ existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
+ 'msisdn', msisdn
+ )
+
+ if existingUid is not None:
+ raise SynapseError(
+ 400, "Phone number is already in use", Codes.THREEPID_IN_USE
+ )
+
+ ret = yield self.identity_handler.requestMsisdnToken(**body)
+ defer.returnValue((200, ret))
+
+
class RegisterRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/register$")
@@ -200,16 +236,37 @@ class RegisterRestServlet(RestServlet):
assigned_user_id=registered_user_id,
)
+ # Only give msisdn flows if the x_show_msisdn flag is given:
+ # this is a hack to work around the fact that clients were shipped
+ # that use fallback registration if they see any flows that they don't
+ # recognise, which means we break registration for these clients if we
+ # advertise msisdn flows. Once usage of Riot iOS <=0.3.9 and Riot
+ # Android <=0.6.9 have fallen below an acceptable threshold, this
+ # parameter should go away and we should always advertise msisdn flows.
+ show_msisdn = False
+ if 'x_show_msisdn' in body and body['x_show_msisdn']:
+ show_msisdn = True
+
if self.hs.config.enable_registration_captcha:
flows = [
[LoginType.RECAPTCHA],
- [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]
+ [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
]
+ if show_msisdn:
+ flows.extend([
+ [LoginType.MSISDN, LoginType.RECAPTCHA],
+ [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
+ ])
else:
flows = [
[LoginType.DUMMY],
- [LoginType.EMAIL_IDENTITY]
+ [LoginType.EMAIL_IDENTITY],
]
+ if show_msisdn:
+ flows.extend([
+ [LoginType.MSISDN],
+ [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
+ ])
authed, auth_result, params, session_id = yield self.auth_handler.check_auth(
flows, body, self.hs.get_ip_from_request(request)
@@ -224,8 +281,9 @@ class RegisterRestServlet(RestServlet):
"Already registered user ID %r for this session",
registered_user_id
)
- # don't re-register the email address
+ # don't re-register the threepids
add_email = False
+ add_msisdn = False
else:
# NB: This may be from the auth handler and NOT from the POST
if 'password' not in params:
@@ -250,6 +308,7 @@ class RegisterRestServlet(RestServlet):
)
add_email = True
+ add_msisdn = True
return_dict = yield self._create_registration_details(
registered_user_id, params
@@ -262,6 +321,13 @@ class RegisterRestServlet(RestServlet):
params.get("bind_email")
)
+ if add_msisdn and auth_result and LoginType.MSISDN in auth_result:
+ threepid = auth_result[LoginType.MSISDN]
+ yield self._register_msisdn_threepid(
+ registered_user_id, threepid, return_dict["access_token"],
+ params.get("bind_msisdn")
+ )
+
defer.returnValue((200, return_dict))
def on_OPTIONS(self, _):
@@ -323,8 +389,9 @@ class RegisterRestServlet(RestServlet):
"""
reqd = ('medium', 'address', 'validated_at')
if any(x not in threepid for x in reqd):
+ # This will only happen if the ID server returns a malformed response
logger.info("Can't add incomplete 3pid")
- defer.returnValue()
+ return
yield self.auth_handler.add_threepid(
user_id,
@@ -372,6 +439,43 @@ class RegisterRestServlet(RestServlet):
logger.info("bind_email not specified: not binding email")
@defer.inlineCallbacks
+ def _register_msisdn_threepid(self, user_id, threepid, token, bind_msisdn):
+ """Add a phone number as a 3pid identifier
+
+ Also optionally binds msisdn to the given user_id on the identity server
+
+ Args:
+ user_id (str): id of user
+ threepid (object): m.login.msisdn auth response
+ token (str): access_token for the user
+ bind_email (bool): true if the client requested the email to be
+ bound at the identity server
+ Returns:
+ defer.Deferred:
+ """
+ reqd = ('medium', 'address', 'validated_at')
+ if any(x not in threepid for x in reqd):
+ # This will only happen if the ID server returns a malformed response
+ logger.info("Can't add incomplete 3pid")
+ defer.returnValue()
+
+ yield self.auth_handler.add_threepid(
+ user_id,
+ threepid['medium'],
+ threepid['address'],
+ threepid['validated_at'],
+ )
+
+ if bind_msisdn:
+ logger.info("bind_msisdn specified: binding")
+ logger.debug("Binding msisdn %s to %s", threepid, user_id)
+ yield self.identity_handler.bind_threepid(
+ threepid['threepid_creds'], user_id
+ )
+ else:
+ logger.info("bind_msisdn not specified: not binding msisdn")
+
+ @defer.inlineCallbacks
def _create_registration_details(self, user_id, params):
"""Complete registration of newly-registered user
@@ -449,5 +553,6 @@ class RegisterRestServlet(RestServlet):
def register_servlets(hs, http_server):
- RegisterRequestTokenRestServlet(hs).register(http_server)
+ EmailRegisterRequestTokenRestServlet(hs).register(http_server)
+ MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index dfb87ffd15..6788375e85 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import synapse.http.servlet
from ._base import parse_media_id, respond_with_file, respond_404
from twisted.web.resource import Resource
@@ -81,6 +82,17 @@ class DownloadResource(Resource):
@defer.inlineCallbacks
def _respond_remote_file(self, request, server_name, media_id, name):
+ # don't forward requests for remote media if allow_remote is false
+ allow_remote = synapse.http.servlet.parse_boolean(
+ request, "allow_remote", default=True)
+ if not allow_remote:
+ logger.info(
+ "Rejecting request for remote media %s/%s due to allow_remote",
+ server_name, media_id,
+ )
+ respond_404(request)
+ return
+
media_info = yield self.media_repo.get_remote_media(server_name, media_id)
media_type = media_info["media_type"]
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 481ffee200..c43b185e08 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -13,22 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer, threads
+import twisted.internet.error
+import twisted.web.http
+from twisted.web.resource import Resource
+
from .upload_resource import UploadResource
from .download_resource import DownloadResource
from .thumbnail_resource import ThumbnailResource
from .identicon_resource import IdenticonResource
from .preview_url_resource import PreviewUrlResource
from .filepath import MediaFilePaths
-
-from twisted.web.resource import Resource
-
from .thumbnailer import Thumbnailer
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.util.stringutils import random_string
-from synapse.api.errors import SynapseError
-
-from twisted.internet import defer, threads
+from synapse.api.errors import SynapseError, HttpResponseException, \
+ NotFoundError
from synapse.util.async import Linearizer
from synapse.util.stringutils import is_ascii
@@ -157,11 +158,34 @@ class MediaRepository(object):
try:
length, headers = yield self.client.get_file(
server_name, request_path, output_stream=f,
- max_size=self.max_upload_size,
+ max_size=self.max_upload_size, args={
+ # tell the remote server to 404 if it doesn't
+ # recognise the server_name, to make sure we don't
+ # end up with a routing loop.
+ "allow_remote": "false",
+ }
)
- except Exception as e:
- logger.warn("Failed to fetch remoted media %r", e)
- raise SynapseError(502, "Failed to fetch remoted media")
+ except twisted.internet.error.DNSLookupError as e:
+ logger.warn("HTTP error fetching remote media %s/%s: %r",
+ server_name, media_id, e)
+ raise NotFoundError()
+
+ except HttpResponseException as e:
+ logger.warn("HTTP error fetching remote media %s/%s: %s",
+ server_name, media_id, e.response)
+ if e.code == twisted.web.http.NOT_FOUND:
+ raise SynapseError.from_http_response_exception(e)
+ raise SynapseError(502, "Failed to fetch remote media")
+
+ except SynapseError:
+ logger.exception("Failed to fetch remote media %s/%s",
+ server_name, media_id)
+ raise
+
+ except Exception:
+ logger.exception("Failed to fetch remote media %s/%s",
+ server_name, media_id)
+ raise SynapseError(502, "Failed to fetch remote media")
media_type = headers["Content-Type"][0]
time_now_ms = self.clock.time_msec()
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 5c7db5e5f6..7925cb5f1b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore):
"""
Args:
destination(str): The name of the remote server.
- last_stream_id(int): The last position of the device message stream
+ last_stream_id(int|long): The last position of the device message stream
that the server sent up to.
- current_stream_id(int): The current position of the device
+ current_stream_id(int|long): The current position of the device
message stream.
Returns:
- Deferred ([dict], int): List of messages for the device and where
+ Deferred ([dict], int|long): List of messages for the device and where
in the stream the messages got to.
"""
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 563071b7a9..e545b62e39 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore):
"""Get stream of updates to send to remote servers
Returns:
- (now_stream_id, [ { updates }, .. ])
+ (int, list[dict]): current stream id and list of updates
"""
now_stream_id = self._device_list_id_gen.get_current_token()
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 03881ea3dc..72319c35ae 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -467,14 +467,9 @@ class EventsStore(SQLBaseStore):
else:
return
- existing_state_rows = yield self._simple_select_list(
- table="current_state_events",
- keyvalues={"room_id": room_id},
- retcols=["event_id", "type", "state_key"],
- desc="_calculate_state_delta",
- )
+ existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(row["event_id"] for row in existing_state_rows)
+ existing_events = set(existing_state.itervalues())
new_events = set(ev_id for ev_id in current_state.itervalues())
changed_events = existing_events ^ new_events
@@ -482,9 +477,8 @@ class EventsStore(SQLBaseStore):
return
to_delete = {
- (row["type"], row["state_key"]): row["event_id"]
- for row in existing_state_rows
- if row["event_id"] in changed_events
+ key: ev_id for key, ev_id in existing_state.iteritems()
+ if ev_id in changed_events
}
events_to_insert = (new_events - existing_events)
to_insert = {
@@ -610,6 +604,10 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,)
)
+ self._invalidate_cache_and_stream(
+ txn, self.get_current_state_ids, (room_id,)
+ )
+
for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn(
txn,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 84482d8285..27f1ec89ec 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from synapse.util.caches import intern_string
from synapse.storage.engines import PostgresEngine
@@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
where_clause="type='m.room.member'",
)
+ @cachedInlineCallbacks(max_entries=100000, iterable=True)
+ def get_current_state_ids(self, room_id):
+ rows = yield self._simple_select_list(
+ table="current_state_events",
+ keyvalues={"room_id": room_id},
+ retcols=["event_id", "type", "state_key"],
+ desc="_calculate_state_delta",
+ )
+ defer.returnValue({
+ (r["type"], r["state_key"]): r["event_id"] for r in rows
+ })
+
@defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids):
if not event_ids:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 200d124632..dddd5fc0e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore):
updatevalues={"stream_id": stream_id},
desc="update_federation_out_pos",
)
+
+ def has_room_changed_since(self, room_id, stream_id):
+ return self._events_stream_cache.has_entity_changed(room_id, stream_id)
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 46cf93ff87..95031dc9ec 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,6 +30,17 @@ class IdGenerator(object):
def _load_current_id(db_conn, table, column, step=1):
+ """
+
+ Args:
+ db_conn (object):
+ table (str):
+ column (str):
+ step (int):
+
+ Returns:
+ int
+ """
cur = db_conn.cursor()
if step == 1:
cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
@@ -131,6 +142,9 @@ class StreamIdGenerator(object):
def get_current_token(self):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
+
+ Returns:
+ int
"""
with self._lock:
if self._unfinished_ids:
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b72bb0ff02..70fe00ce0b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -50,7 +50,7 @@ class StreamChangeCache(object):
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) is int
+ assert type(stream_pos) is int or type(stream_pos) is long
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py
new file mode 100644
index 0000000000..607161e7f0
--- /dev/null
+++ b/synapse/util/msisdn.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import phonenumbers
+from synapse.api.errors import SynapseError
+
+
+def phone_number_to_msisdn(country, number):
+ """
+ Takes an ISO-3166-1 2 letter country code and phone number and
+ returns an msisdn representing the canonical version of that
+ phone number.
+ Args:
+ country (str): ISO-3166-1 2 letter country code
+ number (str): Phone number in a national or international format
+
+ Returns:
+ (str) The canonical form of the phone number, as an msisdn
+ Raises:
+ SynapseError if the number could not be parsed.
+ """
+ try:
+ phoneNumber = phonenumbers.parse(number, country)
+ except phonenumbers.NumberParseException:
+ raise SynapseError(400, "Unable to parse phone number")
+ return phonenumbers.format_number(
+ phoneNumber, phonenumbers.PhoneNumberFormat.E164
+ )[1:]
|