diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
+from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(),
- self.hs.get_reactor().getThreadPool(),
- _do_validate_hash,
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
- run_in_background(self._user_parter_loop)
+ run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 18741c5fac..02f12f6645 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def create_association(self, user_id, room_alias, room_id, servers=None):
- # association creation for human users
- # TODO(erikj): Do user auth.
+ def create_association(self, requester, room_alias, room_id, servers=None,
+ send_event=True):
+ """Attempt to create a new alias
- if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
- raise SynapseError(
- 403, "This user is not permitted to create this alias",
- )
+ Args:
+ requester (Requester)
+ room_alias (RoomAlias)
+ room_id (str)
+ servers (list[str]|None): List of servers that others servers
+ should try and join via
+ send_event (bool): Whether to send an updated m.room.aliases event
- can_create = yield self.can_modify_alias(
- room_alias,
- user_id=user_id
- )
- if not can_create:
- raise SynapseError(
- 400, "This alias is reserved by an application service.",
- errcode=Codes.EXCLUSIVE
- )
- yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ Returns:
+ Deferred
+ """
- @defer.inlineCallbacks
- def create_appservice_association(self, service, room_alias, room_id,
- servers=None):
- if not service.is_interested_in_alias(room_alias.to_string()):
- raise SynapseError(
- 400, "This application service has not reserved"
- " this kind of alias.", errcode=Codes.EXCLUSIVE
+ user_id = requester.user.to_string()
+
+ service = requester.app_service
+ if service:
+ if not service.is_interested_in_alias(room_alias.to_string()):
+ raise SynapseError(
+ 400, "This application service has not reserved"
+ " this kind of alias.", errcode=Codes.EXCLUSIVE
+ )
+ else:
+ if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+ raise AuthError(
+ 403, "This user is not permitted to create this alias",
+ )
+
+ can_create = yield self.can_modify_alias(
+ room_alias,
+ user_id=user_id
)
+ if not can_create:
+ raise AuthError(
+ 400, "This alias is reserved by an application service.",
+ errcode=Codes.EXCLUSIVE
+ )
- # association creation for app services
- yield self._create_association(room_alias, room_id, servers)
+ yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ if send_event:
+ yield self.send_room_alias_update_event(
+ requester,
+ room_id
+ )
@defer.inlineCallbacks
- def delete_association(self, requester, user_id, room_alias):
+ def delete_association(self, requester, room_alias):
# association deletion for human users
+ user_id = requester.user.to_string()
+
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
@@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
try:
yield self.send_room_alias_update_event(
requester,
- requester.user.to_string(),
room_id
)
@@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def send_room_alias_update_event(self, requester, user_id, room_id):
+ def send_room_alias_update_event(self, requester, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
"type": EventTypes.Aliases,
"state_key": self.hs.hostname,
"room_id": room_id,
- "sender": user_id,
+ "sender": requester.user.to_string(),
"content": {"aliases": aliases},
},
ratelimit=False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cab57a8849..cd5b9bbb19 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
-from synapse.state import resolve_events_with_factory
+from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
@@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- # Resolve any conflicting state
- @defer.inlineCallbacks
- def fetch(ev_ids):
- fetched = yield self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- # add any events we fetch here to the `event_map` so that we
- # can use them to build the state event list below.
- event_map.update(fetched)
- defer.returnValue(fetched)
-
room_version = yield self.store.get_room_version(room_id)
- state_map = yield resolve_events_with_factory(
- room_version, state_maps, event_map, fetch,
+ state_map = yield resolve_events_with_store(
+ room_version, state_maps, event_map,
+ state_res_store=StateResolutionStore(self.store),
)
- # we need to give _process_received_pdu the actual state events
+ # We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
+
+ # First though we need to fetch all the events that are in
+ # state_map, so we can build up the state below.
+ evs = yield self.store.get_events(
+ list(state_map.values()),
+ get_prev_content=False,
+ check_redacted=False,
+ )
+ event_map.update(evs)
+
state = [
event_map[e] for e in six.itervalues(state_map)
]
@@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- self.pusher_pool.on_new_notifications(
+ return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
- return getattr(self.transport_client, func_name)(
+ d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
+
+ # Capture errors returned by the remote homeserver and
+ # re-throw specific errors as SynapseErrors. This is so
+ # when the remote end responds with things like 403 Not
+ # In Group, we can communicate that to the client instead
+ # of a 500.
+ def h(failure):
+ failure.trap(HttpResponseException)
+ e = failure.value
+ if e.code == 403:
+ raise e.to_synapse_error()
+ return failure
+ d.addErrback(h)
+ return d
return f
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4954b23a0d..6c4fcfb10a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context
)
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
+ yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..ab1571b27b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
- user_id=user_id,
+ requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
+ send_event=False,
)
preset_config = config.get(
@@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
yield directory_handler.send_room_alias_update_event(
- requester, user_id, room_id
+ requester, room_id
)
defer.returnValue(result)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
- @defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
self._is_processing = True
- try:
- yield self._unsafe_process()
- finally:
- self._is_processing = False
+ run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
|