summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/app/_base.py25
-rwxr-xr-xsynapse/app/homeserver.py11
-rw-r--r--synapse/events/spamcheck.py66
-rw-r--r--synapse/federation/federation_server.py162
-rw-r--r--synapse/federation/transaction_queue.py48
-rw-r--r--synapse/handlers/directory.py15
-rw-r--r--synapse/handlers/federation.py81
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/handlers/room.py8
-rw-r--r--synapse/handlers/room_member.py19
-rw-r--r--synapse/push/baserules.py23
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py35
-rw-r--r--synapse/push/push_rule_evaluator.py50
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/state.py13
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/roommember.py40
-rw-r--r--synapse/util/async.py4
19 files changed, 485 insertions, 127 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ec83e6adb7..bee4aba625 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.23.0-rc2"
+__version__ = "0.23.1"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index cd0e815919..cf4730730d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -12,10 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import gc
 import logging
+import sys
+
+try:
+    import affinity
+except:
+    affinity = None
 
-import affinity
 from daemonize import Daemonize
 from synapse.util import PreserveLoggingContext
 from synapse.util.rlimit import change_resource_limit
@@ -78,6 +84,13 @@ def start_reactor(
         with PreserveLoggingContext():
             logger.info("Running")
             if cpu_affinity is not None:
+                if not affinity:
+                    quit_with_error(
+                        "Missing package 'affinity' required for cpu_affinity\n"
+                        "option\n\n"
+                        "Install by running:\n\n"
+                        "   pip install affinity\n\n"
+                    )
                 logger.info("Setting CPU affinity to %s" % cpu_affinity)
                 affinity.set_process_affinity_mask(0, cpu_affinity)
             change_resource_limit(soft_file_limit)
@@ -97,3 +110,13 @@ def start_reactor(
         daemon.start()
     else:
         run()
+
+
+def quit_with_error(error_string):
+    message_lines = error_string.split("\n")
+    line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
+    sys.stderr.write("*" * line_length + '\n')
+    for line in message_lines:
+        sys.stderr.write(" %s\n" % (line.rstrip(),))
+    sys.stderr.write("*" * line_length + '\n')
+    sys.exit(1)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 84ad8f04a0..3adf72e141 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -25,6 +25,7 @@ from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
     LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
     STATIC_PREFIX, WEB_CLIENT_PREFIX
 from synapse.app import _base
+from synapse.app._base import quit_with_error
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
@@ -249,16 +250,6 @@ class SynapseHomeServer(HomeServer):
         return db_conn
 
 
-def quit_with_error(error_string):
-    message_lines = error_string.split("\n")
-    line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
-    sys.stderr.write("*" * line_length + '\n')
-    for line in message_lines:
-        sys.stderr.write(" %s\n" % (line.rstrip(),))
-    sys.stderr.write("*" * line_length + '\n')
-    sys.exit(1)
-
-
 def setup(config_options):
     """
     Args:
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index e739f105b2..dccc579eac 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -45,3 +45,69 @@ class SpamChecker(object):
             return False
 
         return self.spam_checker.check_event_for_spam(event)
+
+    def user_may_invite(self, inviter_userid, invitee_userid, room_id):
+        """Checks if a given user may send an invite
+
+        If this method returns false, the invite will be rejected.
+
+        Args:
+            userid (string): The sender's user ID
+
+        Returns:
+            bool: True if the user may send an invite, otherwise False
+        """
+        if self.spam_checker is None:
+            return True
+
+        return self.spam_checker.user_may_invite(inviter_userid, invitee_userid, room_id)
+
+    def user_may_create_room(self, userid):
+        """Checks if a given user may create a room
+
+        If this method returns false, the creation request will be rejected.
+
+        Args:
+            userid (string): The sender's user ID
+
+        Returns:
+            bool: True if the user may create a room, otherwise False
+        """
+        if self.spam_checker is None:
+            return True
+
+        return self.spam_checker.user_may_create_room(userid)
+
+    def user_may_create_room_alias(self, userid, room_alias):
+        """Checks if a given user may create a room alias
+
+        If this method returns false, the association request will be rejected.
+
+        Args:
+            userid (string): The sender's user ID
+            room_alias (string): The alias to be created
+
+        Returns:
+            bool: True if the user may create a room alias, otherwise False
+        """
+        if self.spam_checker is None:
+            return True
+
+        return self.spam_checker.user_may_create_room_alias(userid, room_alias)
+
+    def user_may_publish_room(self, userid, room_id):
+        """Checks if a given user may publish a room to the directory
+
+        If this method returns false, the publish request will be rejected.
+
+        Args:
+            userid (string): The sender's user ID
+            room_id (string): The ID of the room that would be published
+
+        Returns:
+            bool: True if the user may publish the room, otherwise False
+        """
+        if self.spam_checker is None:
+            return True
+
+        return self.spam_checker.user_may_publish_room(userid, room_id)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 51e3fdea06..f00d59e701 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,14 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 from twisted.internet import defer
 
 from .federation_base import FederationBase
 from .units import Transaction, Edu
 
-from synapse.util.async import Linearizer
+from synapse.util import async
 from synapse.util.logutils import log_function
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.events import FrozenEvent
@@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature
 import simplejson as json
 import logging
 
+# when processing incoming transactions, we try to handle multiple rooms in
+# parallel, up to this limit.
+TRANSACTION_CONCURRENCY_LIMIT = 10
 
 logger = logging.getLogger(__name__)
 
@@ -52,7 +53,8 @@ class FederationServer(FederationBase):
 
         self.auth = hs.get_auth()
 
-        self._server_linearizer = Linearizer("fed_server")
+        self._server_linearizer = async.Linearizer("fed_server")
+        self._transaction_linearizer = async.Linearizer("fed_txn_handler")
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
@@ -109,25 +111,41 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_incoming_transaction(self, transaction_data):
+        # keep this as early as possible to make the calculated origin ts as
+        # accurate as possible.
+        request_time = self._clock.time_msec()
+
         transaction = Transaction(**transaction_data)
 
-        received_pdus_counter.inc_by(len(transaction.pdus))
+        if not transaction.transaction_id:
+            raise Exception("Transaction missing transaction_id")
+        if not transaction.origin:
+            raise Exception("Transaction missing origin")
 
-        for p in transaction.pdus:
-            if "unsigned" in p:
-                unsigned = p["unsigned"]
-                if "age" in unsigned:
-                    p["age"] = unsigned["age"]
-            if "age" in p:
-                p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
-                del p["age"]
+        logger.debug("[%s] Got transaction", transaction.transaction_id)
 
-        pdu_list = [
-            self.event_from_pdu_json(p) for p in transaction.pdus
-        ]
+        # use a linearizer to ensure that we don't process the same transaction
+        # multiple times in parallel.
+        with (yield self._transaction_linearizer.queue(
+                (transaction.origin, transaction.transaction_id),
+        )):
+            result = yield self._handle_incoming_transaction(
+                transaction, request_time,
+            )
 
-        logger.debug("[%s] Got transaction", transaction.transaction_id)
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _handle_incoming_transaction(self, transaction, request_time):
+        """ Process an incoming transaction and return the HTTP response
+
+        Args:
+            transaction (Transaction): incoming transaction
+            request_time (int): timestamp that the HTTP request arrived at
 
+        Returns:
+            Deferred[(int, object)]: http response code and body
+        """
         response = yield self.transaction_actions.have_responded(transaction)
 
         if response:
@@ -140,42 +158,50 @@ class FederationServer(FederationBase):
 
         logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
-        results = []
-
-        for pdu in pdu_list:
-            # check that it's actually being sent from a valid destination to
-            # workaround bug #1753 in 0.18.5 and 0.18.6
-            if transaction.origin != get_domain_from_id(pdu.event_id):
-                # We continue to accept join events from any server; this is
-                # necessary for the federation join dance to work correctly.
-                # (When we join over federation, the "helper" server is
-                # responsible for sending out the join event, rather than the
-                # origin. See bug #1893).
-                if not (
-                    pdu.type == 'm.room.member' and
-                    pdu.content and
-                    pdu.content.get("membership", None) == 'join'
-                ):
-                    logger.info(
-                        "Discarding PDU %s from invalid origin %s",
-                        pdu.event_id, transaction.origin
-                    )
-                    continue
-                else:
-                    logger.info(
-                        "Accepting join PDU %s from %s",
-                        pdu.event_id, transaction.origin
-                    )
+        received_pdus_counter.inc_by(len(transaction.pdus))
 
-            try:
-                yield self._handle_received_pdu(transaction.origin, pdu)
-                results.append({})
-            except FederationError as e:
-                self.send_failure(e, transaction.origin)
-                results.append({"error": str(e)})
-            except Exception as e:
-                results.append({"error": str(e)})
-                logger.exception("Failed to handle PDU")
+        pdus_by_room = {}
+
+        for p in transaction.pdus:
+            if "unsigned" in p:
+                unsigned = p["unsigned"]
+                if "age" in unsigned:
+                    p["age"] = unsigned["age"]
+            if "age" in p:
+                p["age_ts"] = request_time - int(p["age"])
+                del p["age"]
+
+            event = self.event_from_pdu_json(p)
+            room_id = event.room_id
+            pdus_by_room.setdefault(room_id, []).append(event)
+
+        pdu_results = {}
+
+        # we can process different rooms in parallel (which is useful if they
+        # require callouts to other servers to fetch missing events), but
+        # impose a limit to avoid going too crazy with ram/cpu.
+        @defer.inlineCallbacks
+        def process_pdus_for_room(room_id):
+            logger.debug("Processing PDUs for %s", room_id)
+            for pdu in pdus_by_room[room_id]:
+                event_id = pdu.event_id
+                try:
+                    yield self._handle_received_pdu(
+                        transaction.origin, pdu
+                    )
+                    pdu_results[event_id] = {}
+                except FederationError as e:
+                    logger.warn("Error handling PDU %s: %s", event_id, e)
+                    self.send_failure(e, transaction.origin)
+                    pdu_results[event_id] = {"error": str(e)}
+                except Exception as e:
+                    pdu_results[event_id] = {"error": str(e)}
+                    logger.exception("Failed to handle PDU %s", event_id)
+
+        yield async.concurrently_execute(
+            process_pdus_for_room, pdus_by_room.keys(),
+            TRANSACTION_CONCURRENCY_LIMIT,
+        )
 
         if hasattr(transaction, "edus"):
             for edu in (Edu(**x) for x in transaction.edus):
@@ -188,14 +214,12 @@ class FederationServer(FederationBase):
             for failure in getattr(transaction, "pdu_failures", []):
                 logger.info("Got failure %r", failure)
 
-        logger.debug("Returning: %s", str(results))
-
         response = {
-            "pdus": dict(zip(
-                (p.event_id for p in pdu_list), results
-            )),
+            "pdus": pdu_results,
         }
 
+        logger.debug("Returning: %s", str(response))
+
         yield self.transaction_actions.set_response(
             transaction,
             200, response
@@ -520,6 +544,30 @@ class FederationServer(FederationBase):
         Returns (Deferred): completes with None
         Raises: FederationError if the signatures / hash do not match
     """
+        # check that it's actually being sent from a valid destination to
+        # workaround bug #1753 in 0.18.5 and 0.18.6
+        if origin != get_domain_from_id(pdu.event_id):
+            # We continue to accept join events from any server; this is
+            # necessary for the federation join dance to work correctly.
+            # (When we join over federation, the "helper" server is
+            # responsible for sending out the join event, rather than the
+            # origin. See bug #1893).
+            if not (
+                pdu.type == 'm.room.member' and
+                pdu.content and
+                pdu.content.get("membership", None) == 'join'
+            ):
+                logger.info(
+                    "Discarding PDU %s from invalid origin %s",
+                    pdu.event_id, origin
+                )
+                return
+            else:
+                logger.info(
+                    "Accepting join PDU %s from %s",
+                    pdu.event_id, origin
+                )
+
         # Check signature.
         try:
             pdu = yield self._check_sigs_and_hash(pdu)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 003eaba893..7a3c9cbb70 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,8 +20,8 @@ from .persistence import TransactionActions
 from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
+from synapse.util import logcontext
 from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@@ -231,11 +231,9 @@ class TransactionQueue(object):
                 (pdu, order)
             )
 
-            preserve_context_over_fn(
-                self._attempt_new_transaction, destination
-            )
+            self._attempt_new_transaction(destination)
 
-    @preserve_fn  # the caller should not yield on this
+    @logcontext.preserve_fn  # the caller should not yield on this
     @defer.inlineCallbacks
     def send_presence(self, states):
         """Send the new presence states to the appropriate destinations.
@@ -299,7 +297,7 @@ class TransactionQueue(object):
                     state.user_id: state for state in states
                 })
 
-                preserve_fn(self._attempt_new_transaction)(destination)
+                self._attempt_new_transaction(destination)
 
     def send_edu(self, destination, edu_type, content, key=None):
         edu = Edu(
@@ -321,9 +319,7 @@ class TransactionQueue(object):
         else:
             self.pending_edus_by_dest.setdefault(destination, []).append(edu)
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+        self._attempt_new_transaction(destination)
 
     def send_failure(self, failure, destination):
         if destination == self.server_name or destination == "localhost":
@@ -336,9 +332,7 @@ class TransactionQueue(object):
             destination, []
         ).append(failure)
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+        self._attempt_new_transaction(destination)
 
     def send_device_messages(self, destination):
         if destination == self.server_name or destination == "localhost":
@@ -347,15 +341,24 @@ class TransactionQueue(object):
         if not self.can_send_to(destination):
             return
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+        self._attempt_new_transaction(destination)
 
     def get_current_token(self):
         return 0
 
-    @defer.inlineCallbacks
     def _attempt_new_transaction(self, destination):
+        """Try to start a new transaction to this destination
+
+        If there is already a transaction in progress to this destination,
+        returns immediately. Otherwise kicks off the process of sending a
+        transaction in the background.
+
+        Args:
+            destination (str):
+
+        Returns:
+            None
+        """
         # list of (pending_pdu, deferred, order)
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
@@ -368,6 +371,19 @@ class TransactionQueue(object):
             )
             return
 
+        logger.debug("TX [%s] Starting transaction loop", destination)
+
+        # Drop the logcontext before starting the transaction. It doesn't
+        # really make sense to log all the outbound transactions against
+        # whatever path led us to this point: that's pretty arbitrary really.
+        #
+        # (this also means we can fire off _perform_transaction without
+        # yielding)
+        with logcontext.PreserveLoggingContext():
+            self._transaction_transmission_loop(destination)
+
+    @defer.inlineCallbacks
+    def _transaction_transmission_loop(self, destination):
         pending_pdus = []
         try:
             self.pending_transactions[destination] = 1
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 943554ce98..a0464ae5c0 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -40,6 +40,8 @@ class DirectoryHandler(BaseHandler):
             "directory", self.on_directory_query
         )
 
+        self.spam_checker = hs.get_spam_checker()
+
     @defer.inlineCallbacks
     def _create_association(self, room_alias, room_id, servers=None, creator=None):
         # general association creation for both human users and app services
@@ -73,6 +75,11 @@ class DirectoryHandler(BaseHandler):
         # association creation for human users
         # TODO(erikj): Do user auth.
 
+        if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+            raise SynapseError(
+                403, "This user is not permitted to create this alias",
+            )
+
         can_create = yield self.can_modify_alias(
             room_alias,
             user_id=user_id
@@ -327,6 +334,14 @@ class DirectoryHandler(BaseHandler):
         room_id (str)
         visibility (str): "public" or "private"
         """
+        if not self.spam_checker.user_may_publish_room(
+            requester.user.to_string(), room_id
+        ):
+            raise AuthError(
+                403,
+                "This user is not permitted to publish rooms to the room list"
+            )
+
         if requester.is_guest:
             raise AuthError(403, "Guests cannot edit the published room list")
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 18f87cad67..7711cded01 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
-import synapse.util.logcontext
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -26,10 +25,7 @@ from synapse.api.errors import (
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import (
-    preserve_fn, preserve_context_over_deferred
-)
+from synapse.util import unwrapFirstError, logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor, Linearizer
@@ -77,6 +73,7 @@ class FederationHandler(BaseHandler):
         self.action_generator = hs.get_action_generator()
         self.is_mine_id = hs.is_mine_id
         self.pusher_pool = hs.get_pusherpool()
+        self.spam_checker = hs.get_spam_checker()
 
         self.replication_layer.set_handler(self)
 
@@ -125,6 +122,28 @@ class FederationHandler(BaseHandler):
             self.room_queues[pdu.room_id].append((pdu, origin))
             return
 
+        # If we're no longer in the room just ditch the event entirely. This
+        # is probably an old server that has come back and thinks we're still
+        # in the room (or we've been rejoined to the room by a state reset).
+        #
+        # If we were never in the room then maybe our database got vaped and
+        # we should check if we *are* in fact in the room. If we are then we
+        # can magically rejoin the room.
+        is_in_room = yield self.auth.check_host_in_room(
+            pdu.room_id,
+            self.server_name
+        )
+        if not is_in_room:
+            was_in_room = yield self.store.was_host_joined(
+                pdu.room_id, self.server_name,
+            )
+            if was_in_room:
+                logger.info(
+                    "Ignoring PDU %s for room %s from %s as we've left the room!",
+                    pdu.event_id, pdu.room_id, origin,
+                )
+                return
+
         state = None
 
         auth_chain = []
@@ -591,9 +610,9 @@ class FederationHandler(BaseHandler):
                     missing_auth - failed_to_fetch
                 )
 
-                results = yield preserve_context_over_deferred(defer.gatherResults(
+                results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
-                        preserve_fn(self.replication_layer.get_pdu)(
+                        logcontext.preserve_fn(self.replication_layer.get_pdu)(
                             [dest],
                             event_id,
                             outlier=True,
@@ -785,10 +804,14 @@ class FederationHandler(BaseHandler):
         event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
-        states = yield preserve_context_over_deferred(defer.gatherResults([
-            preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
-            for e in event_ids
-        ]))
+        states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
+                    room_id, [e]
+                )
+                for e in event_ids
+            ], consumeErrors=True,
+        ))
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
@@ -941,9 +964,7 @@ class FederationHandler(BaseHandler):
             # lots of requests for missing prev_events which we do actually
             # have. Hence we fire off the deferred, but don't wait for it.
 
-            synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
-                room_queue
-            )
+            logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
 
         defer.returnValue(True)
 
@@ -1070,6 +1091,9 @@ class FederationHandler(BaseHandler):
         """
         event = pdu
 
+        if event.state_key is None:
+            raise SynapseError(400, "The invite event did not have a state key")
+
         is_blocked = yield self.store.is_room_blocked(event.room_id)
         if is_blocked:
             raise SynapseError(403, "This room has been blocked on this server")
@@ -1077,6 +1101,13 @@ class FederationHandler(BaseHandler):
         if self.hs.config.block_non_admin_invites:
             raise SynapseError(403, "This server does not accept room invites")
 
+        if not self.spam_checker.user_may_invite(
+            event.sender, event.state_key, event.room_id,
+        ):
+            raise SynapseError(
+                403, "This user is not permitted to send invites to this server/user"
+            )
+
         membership = event.content.get("membership")
         if event.type != EventTypes.Member or membership != Membership.INVITE:
             raise SynapseError(400, "The event was not an m.room.member invite event")
@@ -1085,9 +1116,6 @@ class FederationHandler(BaseHandler):
         if sender_domain != origin:
             raise SynapseError(400, "The invite event was not from the server sending it")
 
-        if event.state_key is None:
-            raise SynapseError(400, "The invite event did not have a state key")
-
         if not self.is_mine_id(event.state_key):
             raise SynapseError(400, "The invite event must be for this server")
 
@@ -1430,7 +1458,7 @@ class FederationHandler(BaseHandler):
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
             # and don't need to wait for it.
-            preserve_fn(self.pusher_pool.on_new_notifications)(
+            logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
                 event_stream_id, max_stream_id
             )
 
@@ -1443,16 +1471,16 @@ class FederationHandler(BaseHandler):
         a bunch of outliers, but not a chunk of individual events that depend
         on each other for state calculations.
         """
-        contexts = yield preserve_context_over_deferred(defer.gatherResults(
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self._prep_event)(
+                logcontext.preserve_fn(self._prep_event)(
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
                 for ev_info in event_infos
-            ]
+            ], consumeErrors=True,
         ))
 
         yield self.store.persist_events(
@@ -1760,18 +1788,17 @@ class FederationHandler(BaseHandler):
             # Do auth conflict res.
             logger.info("Different auth: %s", different_auth)
 
-            different_events = yield preserve_context_over_deferred(defer.gatherResults(
-                [
-                    preserve_fn(self.store.get_event)(
+            different_events = yield logcontext.make_deferred_yieldable(
+                defer.gatherResults([
+                    logcontext.preserve_fn(self.store.get_event)(
                         d,
                         allow_none=True,
                         allow_rejected=False,
                     )
                     for d in different_auth
                     if d in have_events and not have_events[d]
-                ],
-                consumeErrors=True
-            )).addErrback(unwrapFirstError)
+                ], consumeErrors=True)
+            ).addErrback(unwrapFirstError)
 
             if different_events:
                 local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e22d4803b9..fbf88b46ef 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -25,6 +25,7 @@ from synapse.types import (
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
 from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
+from synapse.util.frozenutils import unfreeze
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -556,7 +557,7 @@ class MessageHandler(BaseHandler):
 
         # Ensure that we can round trip before trying to persist in db
         try:
-            dump = ujson.dumps(event.content)
+            dump = ujson.dumps(unfreeze(event.content))
             ujson.loads(dump)
         except:
             logger.exception("Failed to encode content: %r", event.content)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5698d28088..535ba9517c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -60,6 +60,11 @@ class RoomCreationHandler(BaseHandler):
         },
     }
 
+    def __init__(self, hs):
+        super(RoomCreationHandler, self).__init__(hs)
+
+        self.spam_checker = hs.get_spam_checker()
+
     @defer.inlineCallbacks
     def create_room(self, requester, config, ratelimit=True):
         """ Creates a new room.
@@ -75,6 +80,9 @@ class RoomCreationHandler(BaseHandler):
         """
         user_id = requester.user.to_string()
 
+        if not self.spam_checker.user_may_create_room(user_id):
+            raise SynapseError(403, "You are not permitted to create rooms")
+
         if ratelimit:
             yield self.ratelimit(requester)
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index d6ad57171c..970fec0666 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -50,6 +50,7 @@ class RoomMemberHandler(BaseHandler):
         self.member_linearizer = Linearizer(name="member")
 
         self.clock = hs.get_clock()
+        self.spam_checker = hs.get_spam_checker()
 
         self.distributor = hs.get_distributor()
         self.distributor.declare("user_joined_room")
@@ -212,12 +213,26 @@ class RoomMemberHandler(BaseHandler):
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
 
-        if (effective_membership_state == "invite" and
-                self.hs.config.block_non_admin_invites):
+        if effective_membership_state == "invite":
+            block_invite = False
             is_requester_admin = yield self.auth.is_server_admin(
                 requester.user,
             )
             if not is_requester_admin:
+                if self.hs.config.block_non_admin_invites:
+                    logger.info(
+                        "Blocking invite: user is not admin and non-admin "
+                        "invites disabled"
+                    )
+                    block_invite = True
+
+                if not self.spam_checker.user_may_invite(
+                    requester.user.to_string(), target.to_string(), room_id,
+                ):
+                    logger.info("Blocking invite due to spam checker")
+                    block_invite = True
+
+            if block_invite:
                 raise SynapseError(
                     403, "Invites have been disabled on this server",
                 )
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 85effdfa46..9dce99ebec 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -238,6 +239,28 @@ BASE_APPEND_OVERRIDE_RULES = [
             }
         ]
     },
+    {
+        'rule_id': 'global/override/.m.rule.roomnotif',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'content.body',
+                'pattern': '*@room*',
+                '_id': '_roomnotif_content',
+            },
+            {
+                'kind': 'sender_notification_permission',
+                'key': 'room',
+                '_id': '_roomnotif_pl',
+            },
+        ],
+        'actions': [
+            'notify', {
+                'set_tweak': 'highlight',
+                'value': True,
+            }
+        ]
+    }
 ]
 
 
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index b0d64aa6c4..425a017bdf 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -19,11 +20,13 @@ from twisted.internet import defer
 
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
+from synapse.event_auth import get_user_power_level
 from synapse.api.constants import EventTypes, Membership
 from synapse.metrics import get_metrics_for
 from synapse.util.caches import metrics as cache_metrics
 from synapse.util.caches.descriptors import cached
 from synapse.util.async import Linearizer
+from synapse.state import POWER_KEY
 
 from collections import namedtuple
 
@@ -59,6 +62,7 @@ class BulkPushRuleEvaluator(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
 
         self.room_push_rule_cache_metrics = cache_metrics.register_cache(
             "cache",
@@ -109,6 +113,29 @@ class BulkPushRuleEvaluator(object):
         )
 
     @defer.inlineCallbacks
+    def _get_power_levels_and_sender_level(self, event, context):
+        pl_event_id = context.prev_state_ids.get(POWER_KEY)
+        if pl_event_id:
+            # fastpath: if there's a power level event, that's all we need, and
+            # not having a power level event is an extreme edge case
+            pl_event = yield self.store.get_event(pl_event_id)
+            auth_events = {POWER_KEY: pl_event}
+        else:
+            auth_events_ids = yield self.auth.compute_auth_events(
+                event, context.prev_state_ids, for_verification=False,
+            )
+            auth_events = yield self.store.get_events(auth_events_ids)
+            auth_events = {
+                (e.type, e.state_key): e for e in auth_events.itervalues()
+            }
+
+        sender_level = get_user_power_level(event.sender, auth_events)
+
+        pl_event = auth_events.get(POWER_KEY)
+
+        defer.returnValue((pl_event.content if pl_event else {}, sender_level))
+
+    @defer.inlineCallbacks
     def action_for_event_by_user(self, event, context):
         """Given an event and context, evaluate the push rules and return
         the results
@@ -123,7 +150,13 @@ class BulkPushRuleEvaluator(object):
             event, context
         )
 
-        evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
+        (power_levels, sender_power_level) = (
+            yield self._get_power_levels_and_sender_level(event, context)
+        )
+
+        evaluator = PushRuleEvaluatorForEvent(
+            event, len(room_members), sender_power_level, power_levels,
+        )
 
         condition_cache = {}
 
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 172c27c137..3601f2d365 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,6 +30,21 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
 
 def _room_member_count(ev, condition, room_member_count):
+    return _test_ineq_condition(condition, room_member_count)
+
+
+def _sender_notification_permission(ev, condition, sender_power_level, power_levels):
+    notif_level_key = condition.get('key')
+    if notif_level_key is None:
+        return False
+
+    notif_levels = power_levels.get('notifications', {})
+    room_notif_level = notif_levels.get(notif_level_key, 50)
+
+    return sender_power_level >= room_notif_level
+
+
+def _test_ineq_condition(condition, number):
     if 'is' not in condition:
         return False
     m = INEQUALITY_EXPR.match(condition['is'])
@@ -41,15 +57,15 @@ def _room_member_count(ev, condition, room_member_count):
     rhs = int(rhs)
 
     if ineq == '' or ineq == '==':
-        return room_member_count == rhs
+        return number == rhs
     elif ineq == '<':
-        return room_member_count < rhs
+        return number < rhs
     elif ineq == '>':
-        return room_member_count > rhs
+        return number > rhs
     elif ineq == '>=':
-        return room_member_count >= rhs
+        return number >= rhs
     elif ineq == '<=':
-        return room_member_count <= rhs
+        return number <= rhs
     else:
         return False
 
@@ -65,9 +81,11 @@ def tweaks_for_actions(actions):
 
 
 class PushRuleEvaluatorForEvent(object):
-    def __init__(self, event, room_member_count):
+    def __init__(self, event, room_member_count, sender_power_level, power_levels):
         self._event = event
         self._room_member_count = room_member_count
+        self._sender_power_level = sender_power_level
+        self._power_levels = power_levels
 
         # Maps strings of e.g. 'content.body' -> event["content"]["body"]
         self._value_cache = _flatten_dict(event)
@@ -81,6 +99,10 @@ class PushRuleEvaluatorForEvent(object):
             return _room_member_count(
                 self._event, condition, self._room_member_count
             )
+        elif condition['kind'] == 'sender_notification_permission':
+            return _sender_notification_permission(
+                self._event, condition, self._sender_power_level, self._power_levels,
+            )
         else:
             return True
 
@@ -183,7 +205,7 @@ def _glob_to_re(glob, word_boundary):
             r,
         )
         if word_boundary:
-            r = r"\b%s\b" % (r,)
+            r = _re_word_boundary(r)
 
             return re.compile(r, flags=re.IGNORECASE)
         else:
@@ -192,7 +214,7 @@ def _glob_to_re(glob, word_boundary):
             return re.compile(r, flags=re.IGNORECASE)
     elif word_boundary:
         r = re.escape(glob)
-        r = r"\b%s\b" % (r,)
+        r = _re_word_boundary(r)
 
         return re.compile(r, flags=re.IGNORECASE)
     else:
@@ -200,6 +222,18 @@ def _glob_to_re(glob, word_boundary):
         return re.compile(r, flags=re.IGNORECASE)
 
 
+def _re_word_boundary(r):
+    """
+    Adds word boundary characters to the start and end of an
+    expression to require that the match occur as a whole word,
+    but do so respecting the fact that strings starting or ending
+    with non-word characters will change word boundaries.
+    """
+    # we can't use \b as it chokes on unicode. however \W seems to be okay
+    # as shorthand for [^0-9A-Za-z_].
+    return r"(^|\W)%s(\W|$)" % (r,)
+
+
 def _flatten_dict(d, prefix=[], result=None):
     if result is None:
         result = {}
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 630e92c90e..7052333c19 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,7 +40,6 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
-    "affinity": ["affinity"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
@@ -59,6 +58,9 @@ CONDITIONAL_REQUIREMENTS = {
     "psutil": {
         "psutil>=2.0.0": ["psutil>=2.0.0"],
     },
+    "affinity": {
+        "affinity": ["affinity"],
+    },
 }
 
 
diff --git a/synapse/state.py b/synapse/state.py
index 390799fbd5..dcdcdef65e 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -288,6 +288,9 @@ class StateHandler(object):
         """
         logger.debug("resolve_state_groups event_ids %s", event_ids)
 
+        # map from state group id to the state in that state group (where
+        # 'state' is a map from state key to event id)
+        # dict[int, dict[(str, str), str]]
         state_groups_ids = yield self.store.get_state_groups_ids(
             room_id, event_ids
         )
@@ -320,11 +323,15 @@ class StateHandler(object):
                 "Resolving state for %s with %d groups", room_id, len(state_groups_ids)
             )
 
+            # build a map from state key to the event_ids which set that state.
+            # dict[(str, str), set[str])
             state = {}
             for st in state_groups_ids.values():
                 for key, e_id in st.items():
                     state.setdefault(key, set()).add(e_id)
 
+            # build a map from state key to the event_ids which set that state,
+            # including only those where there are state keys in conflict.
             conflicted_state = {
                 k: list(v)
                 for k, v in state.items()
@@ -494,8 +501,14 @@ def _resolve_with_state_fac(unconflicted_state, conflicted_state,
 
     logger.info("Asking for %d conflicted events", len(needed_events))
 
+    # dict[str, FrozenEvent]: a map from state event id to event. Only includes
+    # the state events which are in conflict.
     state_map = yield state_map_factory(needed_events)
 
+    # get the ids of the auth events which allow us to authenticate the
+    # conflicted state, picking only from the unconflicting state.
+    #
+    # dict[(str, str), str]: a map from state key to event id
     auth_events = _create_auth_events_from_maps(
         unconflicted_state, conflicted_state, state_map
     )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7002b3752e..4f0b43c36d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -784,6 +784,9 @@ class EventsStore(SQLBaseStore):
                     self._invalidate_cache_and_stream(
                         txn, self.is_host_joined, (room_id, host)
                     )
+                    self._invalidate_cache_and_stream(
+                        txn, self.was_host_joined, (room_id, host)
+                    )
 
                 self._invalidate_cache_and_stream(
                     txn, self.get_users_in_room, (room_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 457ca288d0..a0fc9a6867 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -533,6 +533,46 @@ class RoomMemberStore(SQLBaseStore):
 
         defer.returnValue(True)
 
+    @cachedInlineCallbacks()
+    def was_host_joined(self, room_id, host):
+        """Check whether the server is or ever was in the room.
+
+        Args:
+            room_id (str)
+            host (str)
+
+        Returns:
+            Deferred: Resolves to True if the host is/was in the room, otherwise
+            False.
+        """
+        if '%' in host or '_' in host:
+            raise Exception("Invalid host name")
+
+        sql = """
+            SELECT user_id FROM room_memberships
+            WHERE room_id = ?
+                AND user_id LIKE ?
+                AND membership = 'join'
+            LIMIT 1
+        """
+
+        # We do need to be careful to ensure that host doesn't have any wild cards
+        # in it, but we checked above for known ones and we'll check below that
+        # the returned user actually has the correct domain.
+        like_clause = "%:" + host
+
+        rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
+
+        if not rows:
+            defer.returnValue(False)
+
+        user_id = rows[0][0]
+        if get_domain_from_id(user_id) != host:
+            # This can only happen if the host name has something funky in it
+            raise Exception("Invalid host name")
+
+        defer.returnValue(True)
+
     def get_joined_hosts(self, room_id, state_entry):
         state_group = state_entry.state_group
         if not state_group:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1453faf0ef..bb252f75d7 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -19,7 +19,7 @@ from twisted.internet import defer, reactor
 from .logcontext import (
     PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
 )
-from synapse.util import unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError
 
 from contextlib import contextmanager
 
@@ -155,7 +155,7 @@ def concurrently_execute(func, args, limit):
         except StopIteration:
             pass
 
-    return preserve_context_over_deferred(defer.gatherResults([
+    return logcontext.make_deferred_yieldable(defer.gatherResults([
         preserve_fn(_concurrently_execute_inner)()
         for _ in xrange(limit)
     ], consumeErrors=True)).addErrback(unwrapFirstError)