diff --git a/synapse/__init__.py b/synapse/__init__.py
index 41745170a1..6dbe8fc7e7 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.18.0"
+__version__ = "0.18.1"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e75fd518be..1b3b55d517 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -653,7 +653,7 @@ class Auth(object):
@defer.inlineCallbacks
def _get_appservice_user_id(self, request):
- app_service = yield self.store.get_app_service_by_token(
+ app_service = self.store.get_app_service_by_token(
get_access_token_from_request(
request, self.TOKEN_NOT_FOUND_HTTP_STATUS
)
@@ -855,13 +855,12 @@ class Auth(object):
}
defer.returnValue(user_info)
- @defer.inlineCallbacks
def get_appservice_by_req(self, request):
try:
token = get_access_token_from_request(
request, self.TOKEN_NOT_FOUND_HTTP_STATUS
)
- service = yield self.store.get_app_service_by_token(token)
+ service = self.store.get_app_service_by_token(token)
if not service:
logger.warn("Unrecognised appservice access token: %s" % (token,))
raise AuthError(
@@ -870,7 +869,7 @@ class Auth(object):
errcode=Codes.UNKNOWN_TOKEN
)
request.authenticated_entity = service.sender
- defer.returnValue(service)
+ return defer.succeed(service)
except KeyError:
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token."
@@ -1002,16 +1001,6 @@ class Auth(object):
403,
"You are not allowed to set others state"
)
- else:
- sender_domain = UserID.from_string(
- event.user_id
- ).domain
-
- if sender_domain != event.state_key:
- raise AuthError(
- 403,
- "You are not allowed to set others state"
- )
return True
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index d59f4a571c..1a6f5507a9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -197,7 +197,7 @@ class PusherServer(HomeServer):
yield start_pusher(user_id, app_id, pushkey)
stream = results.get("events")
- if stream:
+ if stream and stream["rows"]:
min_stream_id = stream["rows"][0][0]
max_stream_id = stream["position"]
preserve_fn(pusher_pool.on_new_notifications)(
@@ -205,7 +205,7 @@ class PusherServer(HomeServer):
)
stream = results.get("receipts")
- if stream:
+ if stream and stream["rows"]:
rows = stream["rows"]
affected_room_ids = set(row[1] for row in rows)
min_stream_id = rows[0][0]
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index e58735294e..4981643166 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -55,8 +55,14 @@ class BaseHandler(object):
def ratelimit(self, requester):
time_now = self.clock.time()
+ user_id = requester.user.to_string()
+
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service is not None:
+ return # do not ratelimit app service senders
+
allowed, time_allowed = self.ratelimiter.send_message(
- requester.user.to_string(), time_now,
+ user_id, time_now,
msg_rate_hz=self.hs.config.rc_messages_per_second,
burst_count=self.hs.config.rc_message_burst_count,
)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 88fa0bb2e4..05af54d31b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -59,7 +59,7 @@ class ApplicationServicesHandler(object):
Args:
current_id(int): The current maximum ID.
"""
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
if not services or not self.notify_appservices:
return
@@ -142,7 +142,7 @@ class ApplicationServicesHandler(object):
association can be found.
"""
room_alias_str = room_alias.to_string()
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
alias_query_services = [
s for s in services if (
s.is_interested_in_alias(room_alias_str)
@@ -177,7 +177,7 @@ class ApplicationServicesHandler(object):
@defer.inlineCallbacks
def get_3pe_protocols(self, only_protocol=None):
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
protocols = {}
# Collect up all the individual protocol responses out of the ASes
@@ -224,7 +224,7 @@ class ApplicationServicesHandler(object):
list<ApplicationService>: A list of services interested in this
event based on the service regex.
"""
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_list = [
s for s in services if (
yield s.is_interested(event, self.store)
@@ -232,23 +232,21 @@ class ApplicationServicesHandler(object):
]
defer.returnValue(interested_list)
- @defer.inlineCallbacks
def _get_services_for_user(self, user_id):
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_list = [
s for s in services if (
s.is_interested_in_user(user_id)
)
]
- defer.returnValue(interested_list)
+ return defer.succeed(interested_list)
- @defer.inlineCallbacks
def _get_services_for_3pn(self, protocol):
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_list = [
s for s in services if s.is_interested_in_protocol(protocol)
]
- defer.returnValue(interested_list)
+ return defer.succeed(interested_list)
@defer.inlineCallbacks
def _is_unknown_user(self, user_id):
@@ -264,7 +262,7 @@ class ApplicationServicesHandler(object):
return
# user not found; could be the AS though, so check.
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
service_list = [s for s in services if s.sender == user_id]
defer.returnValue(len(service_list) == 0)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 9583ae1e93..dc0fe60e1b 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -51,7 +51,6 @@ class AuthHandler(BaseHandler):
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
self.sessions = {}
- self.INVALID_TOKEN_HTTP_STATUS = 401
account_handler = _AccountHandler(
hs, check_user_exists=self.check_user_exists
@@ -134,13 +133,30 @@ class AuthHandler(BaseHandler):
creds = session['creds']
# check auth type currently being presented
+ errordict = {}
if 'type' in authdict:
- if authdict['type'] not in self.checkers:
+ login_type = authdict['type']
+ if login_type not in self.checkers:
raise LoginError(400, "", Codes.UNRECOGNIZED)
- result = yield self.checkers[authdict['type']](authdict, clientip)
- if result:
- creds[authdict['type']] = result
- self._save_session(session)
+ try:
+ result = yield self.checkers[login_type](authdict, clientip)
+ if result:
+ creds[login_type] = result
+ self._save_session(session)
+ except LoginError, e:
+ if login_type == LoginType.EMAIL_IDENTITY:
+ # riot used to have a bug where it would request a new
+ # validation token (thus sending a new email) each time it
+ # got a 401 with a 'flows' field.
+ # (https://github.com/vector-im/vector-web/issues/2447).
+ #
+ # Grandfather in the old behaviour for now to avoid
+ # breaking old riot deployments.
+ raise e
+
+ # this step failed. Merge the error dict into the response
+ # so that the client can have another go.
+ errordict = e.error_dict()
for f in flows:
if len(set(f) - set(creds.keys())) == 0:
@@ -149,6 +165,7 @@ class AuthHandler(BaseHandler):
ret = self._auth_dict_for_flows(flows, session)
ret['completed'] = creds.keys()
+ ret.update(errordict)
defer.returnValue((False, ret, clientdict, session['id']))
@defer.inlineCallbacks
@@ -416,37 +433,40 @@ class AuthHandler(BaseHandler):
defer.Deferred: (str) canonical_user_id, or None if zero or
multiple matches
"""
- try:
- res = yield self._find_user_id_and_pwd_hash(user_id)
+ res = yield self._find_user_id_and_pwd_hash(user_id)
+ if res is not None:
defer.returnValue(res[0])
- except LoginError:
- defer.returnValue(None)
+ defer.returnValue(None)
@defer.inlineCallbacks
def _find_user_id_and_pwd_hash(self, user_id):
"""Checks to see if a user with the given id exists. Will check case
- insensitively, but will throw if there are multiple inexact matches.
+ insensitively, but will return None if there are multiple inexact
+ matches.
Returns:
tuple: A 2-tuple of `(canonical_user_id, password_hash)`
+ None: if there is not exactly one match
"""
user_infos = yield self.store.get_users_by_id_case_insensitive(user_id)
+
+ result = None
if not user_infos:
logger.warn("Attempted to login as %s but they do not exist", user_id)
- raise LoginError(403, "", errcode=Codes.FORBIDDEN)
-
- if len(user_infos) > 1:
- if user_id not in user_infos:
- logger.warn(
- "Attempted to login as %s but it matches more than one user "
- "inexactly: %r",
- user_id, user_infos.keys()
- )
- raise LoginError(403, "", errcode=Codes.FORBIDDEN)
-
- defer.returnValue((user_id, user_infos[user_id]))
+ elif len(user_infos) == 1:
+ # a single match (possibly not exact)
+ result = user_infos.popitem()
+ elif user_id in user_infos:
+ # multiple matches, but one is exact
+ result = (user_id, user_infos[user_id])
else:
- defer.returnValue(user_infos.popitem())
+ # multiple matches, none of them exact
+ logger.warn(
+ "Attempted to login as %s but it matches more than one user "
+ "inexactly: %r",
+ user_id, user_infos.keys()
+ )
+ defer.returnValue(result)
@defer.inlineCallbacks
def _check_password(self, user_id, password):
@@ -460,35 +480,46 @@ class AuthHandler(BaseHandler):
Returns:
(str) the canonical_user_id
Raises:
- LoginError if the password was incorrect
+ LoginError if login fails
"""
for provider in self.password_providers:
is_valid = yield provider.check_password(user_id, password)
if is_valid:
defer.returnValue(user_id)
- result = yield self._check_local_password(user_id, password)
- defer.returnValue(result)
+ canonical_user_id = yield self._check_local_password(user_id, password)
+
+ if canonical_user_id:
+ defer.returnValue(canonical_user_id)
+
+ # unknown username or invalid password. We raise a 403 here, but note
+ # that if we're doing user-interactive login, it turns all LoginErrors
+ # into a 401 anyway.
+ raise LoginError(
+ 403, "Invalid password",
+ errcode=Codes.FORBIDDEN
+ )
@defer.inlineCallbacks
def _check_local_password(self, user_id, password):
"""Authenticate a user against the local password database.
- user_id is checked case insensitively, but will throw if there are
+ user_id is checked case insensitively, but will return None if there are
multiple inexact matches.
Args:
user_id (str): complete @user:id
Returns:
- (str) the canonical_user_id
- Raises:
- LoginError if the password was incorrect
+ (str) the canonical_user_id, or None if unknown user / bad password
"""
- user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
+ lookupres = yield self._find_user_id_and_pwd_hash(user_id)
+ if not lookupres:
+ defer.returnValue(None)
+ (user_id, password_hash) = lookupres
result = self.validate_hash(password, password_hash)
if not result:
logger.warn("Failed password login for user %s", user_id)
- raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+ defer.returnValue(None)
defer.returnValue(user_id)
@defer.inlineCallbacks
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 14352985e2..c00274afc3 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -288,13 +288,12 @@ class DirectoryHandler(BaseHandler):
result = yield as_handler.query_room_alias_exists(room_alias)
defer.returnValue(result)
- @defer.inlineCallbacks
def can_modify_alias(self, alias, user_id=None):
# Any application service "interested" in an alias they are regexing on
# can modify the alias.
# Users can only modify the alias if ALL the interested services have
# non-exclusive locks on the alias (or there are no interested services)
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_services = [
s for s in services if s.is_interested_in_alias(alias.to_string())
]
@@ -302,14 +301,12 @@ class DirectoryHandler(BaseHandler):
for service in interested_services:
if user_id == service.sender:
# this user IS the app service so they can do whatever they like
- defer.returnValue(True)
- return
+ return defer.succeed(True)
elif service.is_exclusive_alias(alias.to_string()):
# another service has an exclusive lock on this alias.
- defer.returnValue(False)
- return
+ return defer.succeed(False)
# either no interested services, or no service with an exclusive lock
- defer.returnValue(True)
+ return defer.succeed(True)
@defer.inlineCallbacks
def _user_can_delete_alias(self, alias, user_id):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index d9ac09078d..87f74dfb8e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -65,13 +65,13 @@ class ProfileHandler(BaseHandler):
defer.returnValue(result["displayname"])
@defer.inlineCallbacks
- def set_displayname(self, target_user, requester, new_displayname):
+ def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
"""target_user is the user whose displayname is to be changed;
auth_user is the user attempting to make this change."""
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != requester.user:
+ if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
if new_displayname == '':
@@ -111,13 +111,13 @@ class ProfileHandler(BaseHandler):
defer.returnValue(result["avatar_url"])
@defer.inlineCallbacks
- def set_avatar_url(self, target_user, requester, new_avatar_url):
+ def set_avatar_url(self, target_user, requester, new_avatar_url, by_admin=False):
"""target_user is the user whose avatar_url is to be changed;
auth_user is the user attempting to make this change."""
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != requester.user:
+ if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
yield self.store.set_profile_avatar_url(
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index dd75c4fecf..7e119f13b1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,7 +19,6 @@ import urllib
from twisted.internet import defer
-import synapse.types
from synapse.api.errors import (
AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
)
@@ -194,7 +193,7 @@ class RegistrationHandler(BaseHandler):
def appservice_register(self, user_localpart, as_token):
user = UserID(user_localpart, self.hs.hostname)
user_id = user.to_string()
- service = yield self.store.get_app_service_by_token(as_token)
+ service = self.store.get_app_service_by_token(as_token)
if not service:
raise AuthError(403, "Invalid application service token.")
if not service.is_interested_in_user(user_id):
@@ -305,11 +304,10 @@ class RegistrationHandler(BaseHandler):
# XXX: This should be a deferred list, shouldn't it?
yield identity_handler.bind_threepid(c, user_id)
- @defer.inlineCallbacks
def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
# valid user IDs must not clash with any user ID namespaces claimed by
# application services.
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_services = [
s for s in services
if s.is_interested_in_user(user_id)
@@ -371,7 +369,7 @@ class RegistrationHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def get_or_create_user(self, localpart, displayname, duration_in_ms,
+ def get_or_create_user(self, requester, localpart, displayname, duration_in_ms,
password_hash=None):
"""Creates a new user if the user does not exist,
else revokes all previous access tokens and generates a new one.
@@ -418,9 +416,8 @@ class RegistrationHandler(BaseHandler):
if displayname is not None:
logger.info("setting user display name: %s -> %s", user_id, displayname)
profile_handler = self.hs.get_handlers().profile_handler
- requester = synapse.types.create_requester(user)
yield profile_handler.set_displayname(
- user, requester, displayname
+ user, requester, displayname, by_admin=True,
)
defer.returnValue((user_id, token))
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cbd26f8f95..a7f533f7be 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -437,7 +437,7 @@ class RoomEventSource(object):
logger.warn("Stream has topological part!!!! %r", from_key)
from_key = "s%s" % (from_token.stream,)
- app_service = yield self.store.get_app_service_by_user_id(
+ app_service = self.store.get_app_service_by_user_id(
user.to_string()
)
if app_service:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b5962f4f5a..1f910ff814 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -788,7 +788,7 @@ class SyncHandler(object):
assert since_token
- app_service = yield self.store.get_app_service_by_user_id(user_id)
+ app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 9aab3ce23c..5a14c51d23 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string
from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource
from synapse.replication.presence_resource import PresenceResource
+from synapse.api.errors import SynapseError
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
@@ -166,7 +167,8 @@ class ReplicationResource(Resource):
def replicate():
return self.replicate(request_streams, limit)
- result = yield self.notifier.wait_for_replication(replicate, timeout)
+ writer = yield self.notifier.wait_for_replication(replicate, timeout)
+ result = writer.finish()
for stream_name, stream_content in result.items():
logger.info(
@@ -186,6 +188,9 @@ class ReplicationResource(Resource):
current_token = yield self.current_replication_token()
logger.debug("Replicating up to %r", current_token)
+ if limit == 0:
+ raise SynapseError(400, "Limit cannot be 0")
+
yield self.account_data(writer, current_token, limit, request_streams)
yield self.events(writer, current_token, limit, request_streams)
# TODO: implement limit
@@ -200,7 +205,7 @@ class ReplicationResource(Resource):
self.streams(writer, current_token, request_streams)
logger.debug("Replicated %d rows", writer.total)
- defer.returnValue(writer.finish())
+ defer.returnValue(writer)
def streams(self, writer, current_token, request_streams):
request_token = request_streams.get("streams")
@@ -237,27 +242,48 @@ class ReplicationResource(Resource):
request_events = current_token.events
if request_backfill is None:
request_backfill = current_token.backfill
+
+ no_new_tokens = (
+ request_events == current_token.events
+ and request_backfill == current_token.backfill
+ )
+ if no_new_tokens:
+ return
+
res = yield self.store.get_all_new_events(
request_backfill, request_events,
current_token.backfill, current_token.events,
limit
)
- writer.write_header_and_rows("events", res.new_forward_events, (
- "position", "internal", "json", "state_group"
- ))
- writer.write_header_and_rows("backfill", res.new_backfill_events, (
- "position", "internal", "json", "state_group"
- ))
+
+ upto_events_token = _position_from_rows(
+ res.new_forward_events, current_token.events
+ )
+
+ upto_backfill_token = _position_from_rows(
+ res.new_backfill_events, current_token.backfill
+ )
+
+ if request_events != upto_events_token:
+ writer.write_header_and_rows("events", res.new_forward_events, (
+ "position", "internal", "json", "state_group"
+ ), position=upto_events_token)
+
+ if request_backfill != upto_backfill_token:
+ writer.write_header_and_rows("backfill", res.new_backfill_events, (
+ "position", "internal", "json", "state_group",
+ ), position=upto_backfill_token)
+
writer.write_header_and_rows(
"forward_ex_outliers", res.forward_ex_outliers,
- ("position", "event_id", "state_group")
+ ("position", "event_id", "state_group"),
)
writer.write_header_and_rows(
"backward_ex_outliers", res.backward_ex_outliers,
- ("position", "event_id", "state_group")
+ ("position", "event_id", "state_group"),
)
writer.write_header_and_rows(
- "state_resets", res.state_resets, ("position",)
+ "state_resets", res.state_resets, ("position",),
)
@defer.inlineCallbacks
@@ -266,15 +292,16 @@ class ReplicationResource(Resource):
request_presence = request_streams.get("presence")
- if request_presence is not None:
+ if request_presence is not None and request_presence != current_position:
presence_rows = yield self.presence_handler.get_all_presence_updates(
request_presence, current_position
)
+ upto_token = _position_from_rows(presence_rows, current_position)
writer.write_header_and_rows("presence", presence_rows, (
"position", "user_id", "state", "last_active_ts",
"last_federation_update_ts", "last_user_sync_ts",
"status_msg", "currently_active",
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def typing(self, writer, current_token, request_streams):
@@ -282,7 +309,7 @@ class ReplicationResource(Resource):
request_typing = request_streams.get("typing")
- if request_typing is not None:
+ if request_typing is not None and request_typing != current_position:
# If they have a higher token than current max, we can assume that
# they had been talking to a previous instance of the master. Since
# we reset the token on restart, the best (but hacky) thing we can
@@ -293,9 +320,10 @@ class ReplicationResource(Resource):
typing_rows = yield self.typing_handler.get_all_typing_updates(
request_typing, current_position
)
+ upto_token = _position_from_rows(typing_rows, current_position)
writer.write_header_and_rows("typing", typing_rows, (
"position", "room_id", "typing"
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def receipts(self, writer, current_token, limit, request_streams):
@@ -303,13 +331,14 @@ class ReplicationResource(Resource):
request_receipts = request_streams.get("receipts")
- if request_receipts is not None:
+ if request_receipts is not None and request_receipts != current_position:
receipts_rows = yield self.store.get_all_updated_receipts(
request_receipts, current_position, limit
)
+ upto_token = _position_from_rows(receipts_rows, current_position)
writer.write_header_and_rows("receipts", receipts_rows, (
"position", "room_id", "receipt_type", "user_id", "event_id", "data"
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def account_data(self, writer, current_token, limit, request_streams):
@@ -324,23 +353,36 @@ class ReplicationResource(Resource):
user_account_data = current_position
if room_account_data is None:
room_account_data = current_position
+
+ no_new_tokens = (
+ user_account_data == current_position
+ and room_account_data == current_position
+ )
+ if no_new_tokens:
+ return
+
user_rows, room_rows = yield self.store.get_all_updated_account_data(
user_account_data, room_account_data, current_position, limit
)
+
+ upto_users_token = _position_from_rows(user_rows, current_position)
+ upto_rooms_token = _position_from_rows(room_rows, current_position)
+
writer.write_header_and_rows("user_account_data", user_rows, (
"position", "user_id", "type", "content"
- ))
+ ), position=upto_users_token)
writer.write_header_and_rows("room_account_data", room_rows, (
"position", "user_id", "room_id", "type", "content"
- ))
+ ), position=upto_rooms_token)
if tag_account_data is not None:
tag_rows = yield self.store.get_all_updated_tags(
tag_account_data, current_position, limit
)
+ upto_tag_token = _position_from_rows(tag_rows, current_position)
writer.write_header_and_rows("tag_account_data", tag_rows, (
"position", "user_id", "room_id", "tags"
- ))
+ ), position=upto_tag_token)
@defer.inlineCallbacks
def push_rules(self, writer, current_token, limit, request_streams):
@@ -348,14 +390,15 @@ class ReplicationResource(Resource):
push_rules = request_streams.get("push_rules")
- if push_rules is not None:
+ if push_rules is not None and push_rules != current_position:
rows = yield self.store.get_all_push_rule_updates(
push_rules, current_position, limit
)
+ upto_token = _position_from_rows(rows, current_position)
writer.write_header_and_rows("push_rules", rows, (
"position", "event_stream_ordering", "user_id", "rule_id", "op",
"priority_class", "priority", "conditions", "actions"
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def pushers(self, writer, current_token, limit, request_streams):
@@ -363,18 +406,19 @@ class ReplicationResource(Resource):
pushers = request_streams.get("pushers")
- if pushers is not None:
+ if pushers is not None and pushers != current_position:
updated, deleted = yield self.store.get_all_updated_pushers(
pushers, current_position, limit
)
+ upto_token = _position_from_rows(updated, current_position)
writer.write_header_and_rows("pushers", updated, (
"position", "user_id", "access_token", "profile_tag", "kind",
"app_id", "app_display_name", "device_display_name", "pushkey",
"ts", "lang", "data"
- ))
+ ), position=upto_token)
writer.write_header_and_rows("deleted_pushers", deleted, (
"position", "user_id", "app_id", "pushkey"
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def caches(self, writer, current_token, limit, request_streams):
@@ -382,13 +426,14 @@ class ReplicationResource(Resource):
caches = request_streams.get("caches")
- if caches is not None:
+ if caches is not None and caches != current_position:
updated_caches = yield self.store.get_all_updated_caches(
caches, current_position, limit
)
+ upto_token = _position_from_rows(updated_caches, current_position)
writer.write_header_and_rows("caches", updated_caches, (
"position", "cache_func", "keys", "invalidation_ts"
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def to_device(self, writer, current_token, limit, request_streams):
@@ -396,13 +441,14 @@ class ReplicationResource(Resource):
to_device = request_streams.get("to_device")
- if to_device is not None:
+ if to_device is not None and to_device != current_position:
to_device_rows = yield self.store.get_all_new_device_messages(
to_device, current_position, limit
)
+ upto_token = _position_from_rows(to_device_rows, current_position)
writer.write_header_and_rows("to_device", to_device_rows, (
"position", "user_id", "device_id", "message_json"
- ))
+ ), position=upto_token)
@defer.inlineCallbacks
def public_rooms(self, writer, current_token, limit, request_streams):
@@ -410,13 +456,14 @@ class ReplicationResource(Resource):
public_rooms = request_streams.get("public_rooms")
- if public_rooms is not None:
+ if public_rooms is not None and public_rooms != current_position:
public_rooms_rows = yield self.store.get_all_new_public_rooms(
public_rooms, current_position, limit
)
+ upto_token = _position_from_rows(public_rooms_rows, current_position)
writer.write_header_and_rows("public_rooms", public_rooms_rows, (
"position", "room_id", "visibility"
- ))
+ ), position=upto_token)
class _Writer(object):
@@ -426,11 +473,11 @@ class _Writer(object):
self.total = 0
def write_header_and_rows(self, name, rows, fields, position=None):
- if not rows:
- return
-
if position is None:
- position = rows[-1][0]
+ if rows:
+ position = rows[-1][0]
+ else:
+ return
self.streams[name] = {
"position": position if type(position) is int else str(position),
@@ -440,6 +487,9 @@ class _Writer(object):
self.total += len(rows)
+ def __nonzero__(self):
+ return bool(self.total)
+
def finish(self):
return self.streams
@@ -461,3 +511,20 @@ class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
def __str__(self):
return "_".join(str(value) for value in self)
+
+
+def _position_from_rows(rows, current_position):
+ """Calculates a position to return for a stream. Ideally we want to return the
+ position of the last row, as that will be the most correct. However, if there
+ are no rows we fall back to using the current position to stop us from
+ repeatedly hitting the storage layer unncessarily thinking there are updates.
+ (Not all advances of the token correspond to an actual update)
+
+ We can't just always return the current position, as we often limit the
+ number of rows we replicate, and so the stream may lag. The assumption is
+ that if the storage layer returns no new rows then we are not lagging and
+ we are at the `current_position`.
+ """
+ if rows:
+ return rows[-1][0]
+ return current_position
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 3046da7aec..b5a76fefac 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -22,6 +22,7 @@ from synapse.api.auth import get_access_token_from_request
from .base import ClientV1RestServlet, client_path_patterns
import synapse.util.stringutils as stringutils
from synapse.http.servlet import parse_json_object_from_request
+from synapse.types import create_requester
from synapse.util.async import run_on_reactor
@@ -391,15 +392,16 @@ class CreateUserRestServlet(ClientV1RestServlet):
user_json = parse_json_object_from_request(request)
access_token = get_access_token_from_request(request)
- app_service = yield self.store.get_app_service_by_token(
+ app_service = self.store.get_app_service_by_token(
access_token
)
if not app_service:
raise SynapseError(403, "Invalid application service token.")
- logger.debug("creating user: %s", user_json)
+ requester = create_requester(app_service.sender)
- response = yield self._do_create(user_json)
+ logger.debug("creating user: %s", user_json)
+ response = yield self._do_create(requester, user_json)
defer.returnValue((200, response))
@@ -407,7 +409,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
return 403, {}
@defer.inlineCallbacks
- def _do_create(self, user_json):
+ def _do_create(self, requester, user_json):
yield run_on_reactor()
if "localpart" not in user_json:
@@ -433,6 +435,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
handler = self.handlers.registration_handler
user_id, token = yield handler.get_or_create_user(
+ requester=requester,
localpart=localpart,
displayname=displayname,
duration_in_ms=(duration_seconds * 1000),
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 58d3cad6a1..8e5577148f 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -77,8 +77,10 @@ SUCCESS_TEMPLATE = """
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
<script>
-if (window.onAuthDone != undefined) {
+if (window.onAuthDone) {
window.onAuthDone();
+} else if (window.opener && window.opener.postMessage) {
+ window.opener.postMessage("authDone", "*");
}
</script>
</head>
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index a854a87eab..3d5994a580 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore):
)
def get_app_services(self):
- return defer.succeed(self.services_cache)
+ return self.services_cache
def get_app_service_by_user_id(self, user_id):
"""Retrieve an application service from their user ID.
@@ -54,8 +54,8 @@ class ApplicationServiceStore(SQLBaseStore):
"""
for service in self.services_cache:
if service.sender == user_id:
- return defer.succeed(service)
- return defer.succeed(None)
+ return service
+ return None
def get_app_service_by_token(self, token):
"""Get the application service with the given appservice token.
@@ -67,8 +67,8 @@ class ApplicationServiceStore(SQLBaseStore):
"""
for service in self.services_cache:
if service.token == token:
- return defer.succeed(service)
- return defer.succeed(None)
+ return service
+ return None
def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service.
@@ -163,7 +163,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
["as_id"]
)
# NB: This assumes this class is linked with ApplicationServiceStore
- as_list = yield self.get_app_services()
+ as_list = self.get_app_services()
services = []
for res in results:
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 2ef13d7403..11813b44f6 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -320,6 +320,9 @@ class RoomStore(SQLBaseStore):
txn.execute(sql, (prev_id, current_id, limit,))
return txn.fetchall()
+ if prev_id == current_id:
+ return defer.succeed([])
+
return self.runInteraction(
"get_all_new_public_rooms", get_all_new_public_rooms
)
|