summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/auth.py18
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/directory.py77
-rw-r--r--synapse/handlers/federation.py34
-rw-r--r--synapse/handlers/groups_local.py18
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/room.py5
-rw-r--r--synapse/handlers/user_directory.py14
9 files changed, 100 insertions, 74 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
 import pymacaroons
 from canonicaljson import json
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
+from synapse.util import logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
 
 from ._base import BaseHandler
 
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
                 bcrypt.gensalt(self.bcrypt_rounds),
             ).decode('ascii')
 
-        return make_deferred_yieldable(
-            threads.deferToThreadPool(
-                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
-            ),
-        )
+        return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
             if not isinstance(stored_hash, bytes):
                 stored_hash = stored_hash.encode('ascii')
 
-            return make_deferred_yieldable(
-                threads.deferToThreadPool(
-                    self.hs.get_reactor(),
-                    self.hs.get_reactor().getThreadPool(),
-                    _do_validate_hash,
-                ),
-            )
+            return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
         else:
             return defer.succeed(False)
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
 
 from ._base import BaseHandler
 
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
             None
         """
         if not self._user_parter_running:
-            run_in_background(self._user_parter_loop)
+            run_as_background_process("user_parter_loop", self._user_parter_loop)
 
     @defer.inlineCallbacks
     def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 18741c5fac..02f12f6645 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def create_association(self, user_id, room_alias, room_id, servers=None):
-        # association creation for human users
-        # TODO(erikj): Do user auth.
+    def create_association(self, requester, room_alias, room_id, servers=None,
+                           send_event=True):
+        """Attempt to create a new alias
 
-        if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
-            raise SynapseError(
-                403, "This user is not permitted to create this alias",
-            )
+        Args:
+            requester (Requester)
+            room_alias (RoomAlias)
+            room_id (str)
+            servers (list[str]|None): List of servers that others servers
+                should try and join via
+            send_event (bool): Whether to send an updated m.room.aliases event
 
-        can_create = yield self.can_modify_alias(
-            room_alias,
-            user_id=user_id
-        )
-        if not can_create:
-            raise SynapseError(
-                400, "This alias is reserved by an application service.",
-                errcode=Codes.EXCLUSIVE
-            )
-        yield self._create_association(room_alias, room_id, servers, creator=user_id)
+        Returns:
+            Deferred
+        """
 
-    @defer.inlineCallbacks
-    def create_appservice_association(self, service, room_alias, room_id,
-                                      servers=None):
-        if not service.is_interested_in_alias(room_alias.to_string()):
-            raise SynapseError(
-                400, "This application service has not reserved"
-                " this kind of alias.", errcode=Codes.EXCLUSIVE
+        user_id = requester.user.to_string()
+
+        service = requester.app_service
+        if service:
+            if not service.is_interested_in_alias(room_alias.to_string()):
+                raise SynapseError(
+                    400, "This application service has not reserved"
+                    " this kind of alias.", errcode=Codes.EXCLUSIVE
+                )
+        else:
+            if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+                raise AuthError(
+                    403, "This user is not permitted to create this alias",
+                )
+
+            can_create = yield self.can_modify_alias(
+                room_alias,
+                user_id=user_id
             )
+            if not can_create:
+                raise AuthError(
+                    400, "This alias is reserved by an application service.",
+                    errcode=Codes.EXCLUSIVE
+                )
 
-        # association creation for app services
-        yield self._create_association(room_alias, room_id, servers)
+        yield self._create_association(room_alias, room_id, servers, creator=user_id)
+        if send_event:
+            yield self.send_room_alias_update_event(
+                requester,
+                room_id
+            )
 
     @defer.inlineCallbacks
-    def delete_association(self, requester, user_id, room_alias):
+    def delete_association(self, requester, room_alias):
         # association deletion for human users
 
+        user_id = requester.user.to_string()
+
         try:
             can_delete = yield self._user_can_delete_alias(room_alias, user_id)
         except StoreError as e:
@@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
         try:
             yield self.send_room_alias_update_event(
                 requester,
-                requester.user.to_string(),
                 room_id
             )
 
@@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def send_room_alias_update_event(self, requester, user_id, room_id):
+    def send_room_alias_update_event(self, requester, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
         yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
                 "type": EventTypes.Aliases,
                 "state_key": self.hs.hostname,
                 "room_id": room_id,
-                "sender": user_id,
+                "sender": requester.user.to_string(),
                 "content": {"aliases": aliases},
             },
             ratelimit=False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cab57a8849..cd5b9bbb19 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
     ReplicationFederationSendEventsRestServlet,
 )
 from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
-from synapse.state import resolve_events_with_factory
+from synapse.state import StateResolutionStore, resolve_events_with_store
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.async_helpers import Linearizer
@@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
                             for x in remote_state:
                                 event_map[x.event_id] = x
 
-                    # Resolve any conflicting state
-                    @defer.inlineCallbacks
-                    def fetch(ev_ids):
-                        fetched = yield self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False,
-                        )
-                        # add any events we fetch here to the `event_map` so that we
-                        # can use them to build the state event list below.
-                        event_map.update(fetched)
-                        defer.returnValue(fetched)
-
                     room_version = yield self.store.get_room_version(room_id)
-                    state_map = yield resolve_events_with_factory(
-                        room_version, state_maps, event_map, fetch,
+                    state_map = yield resolve_events_with_store(
+                        room_version, state_maps, event_map,
+                        state_res_store=StateResolutionStore(self.store),
                     )
 
-                    # we need to give _process_received_pdu the actual state events
+                    # We need to give _process_received_pdu the actual state events
                     # rather than event ids, so generate that now.
+
+                    # First though we need to fetch all the events that are in
+                    # state_map, so we can build up the state below.
+                    evs = yield self.store.get_events(
+                        list(state_map.values()),
+                        get_prev_content=False,
+                        check_redacted=False,
+                    )
+                    event_map.update(evs)
+
                     state = [
                         event_map[e] for e in six.itervalues(state_map)
                     ]
@@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
 
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
-                    self._notify_persisted_event(event, max_stream_id)
+                    yield self._notify_persisted_event(event, max_stream_id)
 
     def _notify_persisted_event(self, event, max_stream_id):
         """Checks to see if notifier/pushers should be notified about the
@@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
             extra_users=extra_users
         )
 
-        self.pusher_pool.on_new_notifications(
+        return self.pusher_pool.on_new_notifications(
             event_stream_id, max_stream_id,
         )
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
             )
         else:
             destination = get_domain_from_id(group_id)
-            return getattr(self.transport_client, func_name)(
+            d = getattr(self.transport_client, func_name)(
                 destination, group_id, *args, **kwargs
             )
+
+            # Capture errors returned by the remote homeserver and
+            # re-throw specific errors as SynapseErrors. This is so
+            # when the remote end responds with things like 403 Not
+            # In Group, we can communicate that to the client instead
+            # of a 500.
+            def h(failure):
+                failure.trap(HttpResponseException)
+                e = failure.value
+                if e.code == 403:
+                    raise e.to_synapse_error()
+                return failure
+            d.addErrback(h)
+            return d
     return f
 
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4954b23a0d..6c4fcfb10a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -779,7 +779,7 @@ class EventCreationHandler(object):
             event, context=context
         )
 
-        self.pusher_pool.on_new_notifications(
+        yield self.pusher_pool.on_new_notifications(
             event_stream_id, max_stream_id,
         )
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
             "receipt_key", max_batch_id, rooms=affected_room_ids
         )
         # Note that the min here shouldn't be relied upon to be accurate.
-        self.hs.get_pusherpool().on_new_receipts(
+        yield self.hs.get_pusherpool().on_new_receipts(
             min_batch_id, max_batch_id, affected_room_ids,
         )
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..ab1571b27b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             directory_handler = self.hs.get_handlers().directory_handler
             yield directory_handler.create_association(
-                user_id=user_id,
+                requester=requester,
                 room_id=room_id,
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
+                send_event=False,
             )
 
         preset_config = config.get(
@@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
             yield directory_handler.send_room_alias_update_event(
-                requester, user_id, room_id
+                requester, room_id
             )
 
         defer.returnValue(result)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
 from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
         """
         return self.store.search_user_dir(user_id, search_term, limit)
 
-    @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
         if self._is_processing:
             return
 
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
         self._is_processing = True
-        try:
-            yield self._unsafe_process()
-        finally:
-            self._is_processing = False
+        run_as_background_process("user_directory.notify_new_event", process)
 
     @defer.inlineCallbacks
     def handle_local_profile_change(self, user_id, profile):