diff --git a/synapse/__init__.py b/synapse/__init__.py
index faaa86d972..3cd79b1247 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.16.1-rc1"
+__version__ = "0.16.1"
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2a589524a4..85f5e752fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -49,6 +49,7 @@ class FederationServer(FederationBase):
super(FederationServer, self).__init__(hs)
self._room_pdu_linearizer = Linearizer()
+ self._server_linearizer = Linearizer()
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
@@ -89,11 +90,14 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
- pdus = yield self.handler.on_backfill_request(
- origin, room_id, versions, limit
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ pdus = yield self.handler.on_backfill_request(
+ origin, room_id, versions, limit
+ )
+
+ res = self._transaction_from_pdus(pdus).get_dict()
- defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
+ defer.returnValue((200, res))
@defer.inlineCallbacks
@log_function
@@ -184,27 +188,28 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_context_state_request(self, origin, room_id, event_id):
- if event_id:
- pdus = yield self.handler.get_state_for_pdu(
- origin, room_id, event_id,
- )
- auth_chain = yield self.store.get_auth_chain(
- [pdu.event_id for pdu in pdus]
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ if event_id:
+ pdus = yield self.handler.get_state_for_pdu(
+ origin, room_id, event_id,
+ )
+ auth_chain = yield self.store.get_auth_chain(
+ [pdu.event_id for pdu in pdus]
+ )
- for event in auth_chain:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.hs.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
+ for event in auth_chain:
+ # We sign these again because there was a bug where we
+ # incorrectly signed things the first time round
+ if self.hs.is_mine_id(event.event_id):
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- )
- else:
- raise NotImplementedError("Specify an event")
+ else:
+ raise NotImplementedError("Specify an event")
defer.returnValue((200, {
"pdus": [pdu.get_pdu_json() for pdu in pdus],
@@ -283,14 +288,16 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
- time_now = self._clock.time_msec()
- auth_pdus = yield self.handler.on_event_auth(event_id)
- defer.returnValue((200, {
- "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
- }))
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ time_now = self._clock.time_msec()
+ auth_pdus = yield self.handler.on_event_auth(event_id)
+ res = {
+ "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+ }
+ defer.returnValue((200, res))
@defer.inlineCallbacks
- def on_query_auth_request(self, origin, content, event_id):
+ def on_query_auth_request(self, origin, content, room_id, event_id):
"""
Content is a dict with keys::
auth_chain (list): A list of events that give the auth chain.
@@ -309,32 +316,33 @@ class FederationServer(FederationBase):
Returns:
Deferred: Results in `dict` with the same format as `content`
"""
- auth_chain = [
- self.event_from_pdu_json(e)
- for e in content["auth_chain"]
- ]
-
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- origin, auth_chain, outlier=True
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ auth_chain = [
+ self.event_from_pdu_json(e)
+ for e in content["auth_chain"]
+ ]
+
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ origin, auth_chain, outlier=True
+ )
- ret = yield self.handler.on_query_auth(
- origin,
- event_id,
- signed_auth,
- content.get("rejects", []),
- content.get("missing", []),
- )
+ ret = yield self.handler.on_query_auth(
+ origin,
+ event_id,
+ signed_auth,
+ content.get("rejects", []),
+ content.get("missing", []),
+ )
- time_now = self._clock.time_msec()
- send_content = {
- "auth_chain": [
- e.get_pdu_json(time_now)
- for e in ret["auth_chain"]
- ],
- "rejects": ret.get("rejects", []),
- "missing": ret.get("missing", []),
- }
+ time_now = self._clock.time_msec()
+ send_content = {
+ "auth_chain": [
+ e.get_pdu_json(time_now)
+ for e in ret["auth_chain"]
+ ],
+ "rejects": ret.get("rejects", []),
+ "missing": ret.get("missing", []),
+ }
defer.returnValue(
(200, send_content)
@@ -386,21 +394,24 @@ class FederationServer(FederationBase):
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
- logger.info(
- "on_get_missing_events: earliest_events: %r, latest_events: %r,"
- " limit: %d, min_depth: %d",
- earliest_events, latest_events, limit, min_depth
- )
- missing_events = yield self.handler.on_get_missing_events(
- origin, room_id, earliest_events, latest_events, limit, min_depth
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ logger.info(
+ "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+ " limit: %d, min_depth: %d",
+ earliest_events, latest_events, limit, min_depth
+ )
+ missing_events = yield self.handler.on_get_missing_events(
+ origin, room_id, earliest_events, latest_events, limit, min_depth
+ )
- if len(missing_events) < 5:
- logger.info("Returning %d events: %r", len(missing_events), missing_events)
- else:
- logger.info("Returning %d events", len(missing_events))
+ if len(missing_events) < 5:
+ logger.info(
+ "Returning %d events: %r", len(missing_events), missing_events
+ )
+ else:
+ logger.info("Returning %d events", len(missing_events))
- time_now = self._clock.time_msec()
+ time_now = self._clock.time_msec()
defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8a1965f45a..26fa88ae84 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_POST(self, origin, content, query, context, event_id):
new_content = yield self.handler.on_query_auth_request(
- origin, content, event_id
+ origin, content, context, event_id
)
defer.returnValue((200, new_content))
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index e37409170d..711a6a567f 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -36,13 +36,6 @@ class ProfileHandler(BaseHandler):
"profile", self.on_profile_query
)
- distributor = hs.get_distributor()
-
- distributor.observe("registered_user", self.registered_user)
-
- def registered_user(self, user):
- return self.store.create_profile(user.localpart)
-
@defer.inlineCallbacks
def get_displayname(self, target_user):
if self.hs.is_mine(target_user):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index e0aaefe7be..0b7517221d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,6 @@ from synapse.api.errors import (
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.http.client import CaptchaServerHttpClient
-from synapse.util.distributor import registered_user
import logging
import urllib
@@ -37,8 +36,6 @@ class RegistrationHandler(BaseHandler):
super(RegistrationHandler, self).__init__(hs)
self.auth = hs.get_auth()
- self.distributor = hs.get_distributor()
- self.distributor.declare("registered_user")
self.captcha_client = CaptchaServerHttpClient(hs)
self._next_generated_user_id = None
@@ -140,9 +137,11 @@ class RegistrationHandler(BaseHandler):
password_hash=password_hash,
was_guest=was_guest,
make_guest=make_guest,
+ create_profile_with_localpart=(
+ # If the user was a guest then they already have a profile
+ None if was_guest else user.localpart
+ ),
)
-
- yield registered_user(self.distributor, user)
else:
# autogen a sequential user ID
attempts = 0
@@ -160,7 +159,8 @@ class RegistrationHandler(BaseHandler):
user_id=user_id,
token=token,
password_hash=password_hash,
- make_guest=make_guest
+ make_guest=make_guest,
+ create_profile_with_localpart=user.localpart,
)
except SynapseError:
# if user id is taken, just generate another
@@ -168,7 +168,6 @@ class RegistrationHandler(BaseHandler):
user_id = None
token = None
attempts += 1
- yield registered_user(self.distributor, user)
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
@@ -201,8 +200,8 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash="",
appservice_id=service_id,
+ create_profile_with_localpart=user.localpart,
)
- yield registered_user(self.distributor, user)
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -248,9 +247,9 @@ class RegistrationHandler(BaseHandler):
yield self.store.register(
user_id=user_id,
token=token,
- password_hash=None
+ password_hash=None,
+ create_profile_with_localpart=user.localpart,
)
- yield registered_user(self.distributor, user)
except Exception as e:
yield self.store.add_access_token_to_user(user_id, token)
# Ignore Registration errors
@@ -395,10 +394,9 @@ class RegistrationHandler(BaseHandler):
yield self.store.register(
user_id=user_id,
token=token,
- password_hash=None
+ password_hash=None,
+ create_profile_with_localpart=user.localpart,
)
-
- yield registered_user(self.distributor, user)
else:
yield self.store.user_delete_access_tokens(user_id=user_id)
yield self.store.add_access_token_to_user(user_id=user_id, token=token)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index bda84a744a..3de9e0f709 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -76,7 +76,8 @@ class RegistrationStore(SQLBaseStore):
@defer.inlineCallbacks
def register(self, user_id, token, password_hash,
- was_guest=False, make_guest=False, appservice_id=None):
+ was_guest=False, make_guest=False, appservice_id=None,
+ create_profile_with_localpart=None):
"""Attempts to register an account.
Args:
@@ -88,6 +89,8 @@ class RegistrationStore(SQLBaseStore):
make_guest (boolean): True if the the new user should be guest,
false to add a regular user account.
appservice_id (str): The ID of the appservice registering the user.
+ create_profile_with_localpart (str): Optionally create a profile for
+ the given localpart.
Raises:
StoreError if the user_id could not be registered.
"""
@@ -99,7 +102,8 @@ class RegistrationStore(SQLBaseStore):
password_hash,
was_guest,
make_guest,
- appservice_id
+ appservice_id,
+ create_profile_with_localpart,
)
self.get_user_by_id.invalidate((user_id,))
self.is_guest.invalidate((user_id,))
@@ -112,7 +116,8 @@ class RegistrationStore(SQLBaseStore):
password_hash,
was_guest,
make_guest,
- appservice_id
+ appservice_id,
+ create_profile_with_localpart,
):
now = int(self.clock.time())
@@ -157,6 +162,12 @@ class RegistrationStore(SQLBaseStore):
(next_id, user_id, token,)
)
+ if create_profile_with_localpart:
+ txn.execute(
+ "INSERT INTO profiles(user_id) VALUES (?)",
+ (create_profile_with_localpart,)
+ )
+
@cached()
def get_user_by_id(self, user_id):
return self._simple_select_one(
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index d7cccc06b1..e68f94ce77 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -27,10 +27,6 @@ import logging
logger = logging.getLogger(__name__)
-def registered_user(distributor, user):
- return distributor.fire("registered_user", user)
-
-
def user_left_room(distributor, user, room_id):
return preserve_context_over_fn(
distributor.fire,
|