summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-05-23 10:54:14 +0100
committerErik Johnston <erik@matrix.org>2018-05-23 10:54:14 +0100
commit6e11803ed3669fb897fe61da168eb033ad495198 (patch)
tree936c98227c65d687318c3d573a4c4a5a5fdb1e33
parentMerge pull request #3226 from matrix-org/erikj/chunk_base (diff)
parentMerge pull request #3265 from matrix-org/erikj/limit_pagination (diff)
downloadsynapse-6e11803ed3669fb897fe61da168eb033ad495198.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/room_chunks
-rw-r--r--CHANGES.rst10
-rw-r--r--Dockerfile4
-rw-r--r--README.rst5
-rw-r--r--contrib/docker/README.md17
-rw-r--r--docs/postgres.rst32
-rw-r--r--docs/privacy_policy_templates/README.md2
-rw-r--r--docs/privacy_policy_templates/en/1.0.html6
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/errors.py31
-rw-r--r--synapse/api/urls.py50
-rwxr-xr-xsynapse/app/homeserver.py2
-rw-r--r--synapse/config/consent_config.py44
-rw-r--r--synapse/config/homeserver.py5
-rw-r--r--synapse/config/server_notices_config.py77
-rw-r--r--synapse/handlers/__init__.py5
-rw-r--r--synapse/handlers/deactivate_account.py2
-rw-r--r--synapse/handlers/events.py5
-rw-r--r--synapse/handlers/federation.py8
-rw-r--r--synapse/handlers/message.py98
-rw-r--r--synapse/handlers/presence.py61
-rw-r--r--synapse/handlers/register.py14
-rw-r--r--synapse/handlers/room.py28
-rw-r--r--synapse/handlers/room_member.py49
-rw-r--r--synapse/http/request_metrics.py160
-rw-r--r--synapse/http/site.py4
-rw-r--r--synapse/replication/tcp/resource.py2
-rw-r--r--synapse/rest/client/transactions.py42
-rw-r--r--synapse/rest/client/v1/admin.py21
-rw-r--r--synapse/rest/client/v1/room.py5
-rw-r--r--synapse/rest/client/v2_alpha/sync.py4
-rw-r--r--synapse/rest/consent/consent_resource.py26
-rw-r--r--synapse/server.py22
-rw-r--r--synapse/server.pyi19
-rw-r--r--synapse/server_notices/__init__.py0
-rw-r--r--synapse/server_notices/consent_server_notices.py95
-rw-r--r--synapse/server_notices/server_notices_manager.py131
-rw-r--r--synapse/server_notices/server_notices_sender.py58
-rw-r--r--synapse/server_notices/worker_server_notices_sender.py46
-rw-r--r--synapse/storage/__init__.py8
-rw-r--r--synapse/storage/events.py77
-rw-r--r--synapse/storage/receipts.py18
-rw-r--r--synapse/storage/registration.py46
-rw-r--r--synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql20
-rw-r--r--synapse/storage/stream.py15
-rw-r--r--synapse/util/logcontext.py38
-rw-r--r--tests/rest/client/test_transactions.py75
-rw-r--r--tests/storage/test_registration.py11
-rw-r--r--tests/utils.py2
48 files changed, 1290 insertions, 212 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index ea532c6cca..f7a086d912 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -6,9 +6,15 @@ startup there will be an inceased amount of IO until the index is created, and
 an increase in disk usage.
 
 
-Changes in synapse v0.29.0 (2018-05-16)
-=======================================
+Changes in synapse v0.29.1 (2018-05-17)
+==========================================
+Changes:
 
+* Update docker documentation (PR #3222)
+
+Changes in synapse v0.29.0 (2018-05-16)
+===========================================
+Not changes since v0.29.0-rc1
 
 Changes in synapse v0.29.0-rc1 (2018-05-14)
 ===========================================
diff --git a/Dockerfile b/Dockerfile
index 8085f3d354..565341fee3 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,12 +1,12 @@
 FROM docker.io/python:2-alpine3.7
 
-RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev
+RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev libxslt-dev
 
 COPY . /synapse
 
 # A wheel cache may be provided in ./cache for faster build
 RUN cd /synapse \
- && pip install --upgrade pip setuptools psycopg2 \
+ && pip install --upgrade pip setuptools psycopg2 lxml \
  && mkdir -p /synapse/cache \
  && pip install -f /synapse/cache --upgrade --process-dependency-links . \
  && mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \
diff --git a/README.rst b/README.rst
index 28fbe45de6..4fe54b0c90 100644
--- a/README.rst
+++ b/README.rst
@@ -157,8 +157,9 @@ if you prefer.
 
 In case of problems, please see the _`Troubleshooting` section below.
 
-Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
-above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
+There is an offical synapse image available at https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with the docker-compose file available at `contrib/docker`. Further information on this including configuration options is available in `contrib/docker/README.md`.
+
+Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate a synapse server in a single Docker image, at https://hub.docker.com/r/avhost/docker-matrix/tags/
 
 Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
 tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
diff --git a/contrib/docker/README.md b/contrib/docker/README.md
index aed56646c2..61592109cb 100644
--- a/contrib/docker/README.md
+++ b/contrib/docker/README.md
@@ -1,9 +1,9 @@
 # Synapse Docker
 
-This Docker image will run Synapse as a single process. It does not provide any
-database server or TURN server that you should run separately.
+The `matrixdotorg/synapse` Docker image will run Synapse as a single process. It does not provide a
+database server or a TURN server, you should run these separately.
 
-If you run a Postgres server, you should simply have it in the same Compose
+If you run a Postgres server, you should simply include it in the same Compose
 project or set the proper environment variables and the image will automatically
 use that server.
 
@@ -37,10 +37,15 @@ then run the server:
 docker-compose up -d
 ```
 
+If secrets are not specified in the environment variables, they will be generated
+as part of the startup. Please ensure these secrets are kept between launches of the
+Docker container, as their loss may require users to log in again.
+
 ### Manual configuration
 
 A sample ``docker-compose.yml`` is provided, including example labels for
-reverse proxying and other artifacts.
+reverse proxying and other artifacts. The docker-compose file is an example,
+please comment/uncomment sections that are not suitable for your usecase.
 
 Specify a ``SYNAPSE_CONFIG_PATH``, preferably to a persistent path,
 to use manual configuration. To generate a fresh ``homeserver.yaml``, simply run:
@@ -111,8 +116,6 @@ variables are available for configuration:
 * ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
 * ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
   statistics reporting back to the Matrix project which helps us to get funding.
-* ``SYNAPSE_MACAROON_SECRET_KEY`` (mandatory) secret for signing access tokens
-  to the server, set this to a proper random key.
 * ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
   you run your own TLS-capable reverse proxy).
 * ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
@@ -132,6 +135,8 @@ Shared secrets, that will be initialized to random values if not set:
 
 * ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
   registration is disable.
+* ``SYNAPSE_MACAROON_SECRET_KEY`` secret for signing access tokens
+  to the server.
 
 Database specific values (will use SQLite if not set):
 
diff --git a/docs/postgres.rst b/docs/postgres.rst
index 904942ec74..296293e859 100644
--- a/docs/postgres.rst
+++ b/docs/postgres.rst
@@ -6,7 +6,13 @@ Postgres version 9.4 or later is known to work.
 Set up database
 ===============
 
-The PostgreSQL database used *must* have the correct encoding set, otherwise
+Assuming your PostgreSQL database user is called ``postgres``, create a user
+``synapse_user`` with::
+
+ su - postgres
+ createuser --pwprompt synapse_user
+
+The PostgreSQL database used *must* have the correct encoding set, otherwise it
 would not be able to store UTF8 strings. To create a database with the correct
 encoding use, e.g.::
 
@@ -46,8 +52,8 @@ As with Debian/Ubuntu, postgres support depends on the postgres python connector
 Synapse config
 ==============
 
-When you are ready to start using PostgreSQL, add the following line to your
-config file::
+When you are ready to start using PostgreSQL, edit the ``database`` section in
+your config file to match the following lines::
 
     database:
         name: psycopg2
@@ -96,9 +102,12 @@ complete, restart synapse.  For instance::
     cp homeserver.db homeserver.db.snapshot
     ./synctl start
 
-Assuming your new config file (as described in the section *Synapse config*)
-is named ``homeserver-postgres.yaml`` and the SQLite snapshot is at
-``homeserver.db.snapshot`` then simply run::
+Copy the old config file into a new config file::
+
+    cp homeserver.yaml homeserver-postgres.yaml
+
+Edit the database section as described in the section *Synapse config* above
+and with the SQLite snapshot located at ``homeserver.db.snapshot`` simply run::
 
     synapse_port_db --sqlite-database homeserver.db.snapshot \
         --postgres-config homeserver-postgres.yaml
@@ -117,6 +126,11 @@ run::
         --postgres-config homeserver-postgres.yaml
 
 Once that has completed, change the synapse config to point at the PostgreSQL
-database configuration file ``homeserver-postgres.yaml`` (i.e. rename it to 
-``homeserver.yaml``) and restart synapse. Synapse should now be running against
-PostgreSQL.
+database configuration file ``homeserver-postgres.yaml``:
+
+    ./synctl stop
+    mv homeserver.yaml homeserver-old-sqlite.yaml 
+    mv homeserver-postgres.yaml homeserver.yaml 
+    ./synctl start
+
+Synapse should now be running against PostgreSQL.
diff --git a/docs/privacy_policy_templates/README.md b/docs/privacy_policy_templates/README.md
index 8e91c516b3..a3e6fc0986 100644
--- a/docs/privacy_policy_templates/README.md
+++ b/docs/privacy_policy_templates/README.md
@@ -9,7 +9,7 @@ form_secret: <unique but arbitrary secret>
 
 user_consent:
   template_dir: docs/privacy_policy_templates
-  default_version: 1.0
+  version: 1.0
 ```
 
 You should then be able to enable the `consent` resource under a `listener`
diff --git a/docs/privacy_policy_templates/en/1.0.html b/docs/privacy_policy_templates/en/1.0.html
index ab8666f0c3..55c5e4b612 100644
--- a/docs/privacy_policy_templates/en/1.0.html
+++ b/docs/privacy_policy_templates/en/1.0.html
@@ -4,6 +4,11 @@
     <title>Matrix.org Privacy policy</title>
   </head>
   <body>
+  {% if has_consented %}
+    <p>
+      Your base already belong to us.
+    </p>
+  {% else %}
     <p>
       All your base are belong to us.
     </p>
@@ -13,5 +18,6 @@
       <input type="hidden" name="h" value="{{userhmac}}"/>
       <input type="submit" value="Sure thing!"/>
     </form>
+  {% endif %}
   </body>
 </html>
diff --git a/synapse/__init__.py b/synapse/__init__.py
index d94c20505e..b11a1d3ea3 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.29.0"
+__version__ = "0.29.1"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index a9ff5576f3..e6ad3768f0 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -19,6 +19,7 @@ import logging
 
 import simplejson as json
 from six import iteritems
+from six.moves import http_client
 
 logger = logging.getLogger(__name__)
 
@@ -51,6 +52,8 @@ class Codes(object):
     THREEPID_DENIED = "M_THREEPID_DENIED"
     INVALID_USERNAME = "M_INVALID_USERNAME"
     SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
+    CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
+    CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
 
 
 class CodeMessageException(RuntimeError):
@@ -138,6 +141,32 @@ class SynapseError(CodeMessageException):
         return res
 
 
+class ConsentNotGivenError(SynapseError):
+    """The error returned to the client when the user has not consented to the
+    privacy policy.
+    """
+    def __init__(self, msg, consent_uri):
+        """Constructs a ConsentNotGivenError
+
+        Args:
+            msg (str): The human-readable error message
+            consent_url (str): The URL where the user can give their consent
+        """
+        super(ConsentNotGivenError, self).__init__(
+            code=http_client.FORBIDDEN,
+            msg=msg,
+            errcode=Codes.CONSENT_NOT_GIVEN
+        )
+        self._consent_uri = consent_uri
+
+    def error_dict(self):
+        return cs_error(
+            self.msg,
+            self.errcode,
+            consent_uri=self._consent_uri
+        )
+
+
 class RegistrationError(SynapseError):
     """An error raised when a registration event fails."""
     pass
@@ -292,7 +321,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
 
     Args:
         msg (str): The error message.
-        code (int): The error code.
+        code (str): The error code.
         kwargs : Additional keys to add to the response.
     Returns:
         A dict representing the error response JSON.
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 91a33a3402..bb46b5da8a 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,6 +15,12 @@
 # limitations under the License.
 
 """Contains the URL paths to prefix various aspects of the server with. """
+from hashlib import sha256
+import hmac
+
+from six.moves.urllib.parse import urlencode
+
+from synapse.config import ConfigError
 
 CLIENT_PREFIX = "/_matrix/client/api/v1"
 CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha"
@@ -25,3 +32,46 @@ SERVER_KEY_PREFIX = "/_matrix/key/v1"
 SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
 MEDIA_PREFIX = "/_matrix/media/r0"
 LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
+
+
+class ConsentURIBuilder(object):
+    def __init__(self, hs_config):
+        """
+        Args:
+            hs_config (synapse.config.homeserver.HomeServerConfig):
+        """
+        if hs_config.form_secret is None:
+            raise ConfigError(
+                "form_secret not set in config",
+            )
+        if hs_config.public_baseurl is None:
+            raise ConfigError(
+                "public_baseurl not set in config",
+            )
+
+        self._hmac_secret = hs_config.form_secret.encode("utf-8")
+        self._public_baseurl = hs_config.public_baseurl
+
+    def build_user_consent_uri(self, user_id):
+        """Build a URI which we can give to the user to do their privacy
+        policy consent
+
+        Args:
+            user_id (str): mxid or username of user
+
+        Returns
+            (str) the URI where the user can do consent
+        """
+        mac = hmac.new(
+            key=self._hmac_secret,
+            msg=user_id,
+            digestmod=sha256,
+        ).hexdigest()
+        consent_uri = "%s_matrix/consent?%s" % (
+            self._public_baseurl,
+            urlencode({
+                "u": user_id,
+                "h": mac
+            }),
+        )
+        return consent_uri
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index bceb21a8d5..caccbaa814 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -41,7 +41,6 @@ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
 from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
-from synapse.rest.consent.consent_resource import ConsentResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
@@ -186,6 +185,7 @@ class SynapseHomeServer(HomeServer):
             })
 
         if name == "consent":
+            from synapse.rest.consent.consent_resource import ConsentResource
             consent_resource = ConsentResource(self)
             if compress:
                 consent_resource = gz_wrap(consent_resource)
diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py
index 675fce0911..44c4711e6c 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent_config.py
@@ -18,25 +18,59 @@ from ._base import Config
 DEFAULT_CONFIG = """\
 # User Consent configuration
 #
-# uncomment and configure if enabling the 'consent' resource under 'listeners'.
+# Parts of this section are required if enabling the 'consent' resource under
+# 'listeners', in particular 'template_dir' and 'version'.
 #
 # 'template_dir' gives the location of the templates for the HTML forms.
 # This directory should contain one subdirectory per language (eg, 'en', 'fr'),
 # and each language directory should contain the policy document (named as
 # '<version>.html') and a success page (success.html).
 #
-# 'default_version' gives the version of the policy document to serve up if
-# there is no 'v' parameter.
+# 'version' specifies the 'current' version of the policy document. It defines
+# the version to be served by the consent resource if there is no 'v'
+# parameter.
+#
+# 'server_notice_content', if enabled, will send a user a "Server Notice"
+# asking them to consent to the privacy policy. The 'server_notices' section
+# must also be configured for this to work.
+#
+# 'block_events_error', if set, will block any attempts to send events
+# until the user consents to the privacy policy. The value of the setting is
+# used as the text of the error.
 #
 # user_consent:
 #   template_dir: res/templates/privacy
-#   default_version: 1.0
+#   version: 1.0
+#   server_notice_content:
+#     msgtype: m.text
+#     body: |
+#       Pls do consent kthx
+#   block_events_error: |
+#     You can't send any messages until you consent to the privacy policy.
 """
 
 
 class ConsentConfig(Config):
+    def __init__(self):
+        super(ConsentConfig, self).__init__()
+
+        self.user_consent_version = None
+        self.user_consent_template_dir = None
+        self.user_consent_server_notice_content = None
+        self.block_events_without_consent_error = None
+
     def read_config(self, config):
-        self.consent_config = config.get("user_consent")
+        consent_config = config.get("user_consent")
+        if consent_config is None:
+            return
+        self.user_consent_version = str(consent_config["version"])
+        self.user_consent_template_dir = consent_config["template_dir"]
+        self.user_consent_server_notice_content = consent_config.get(
+            "server_notice_content",
+        )
+        self.block_events_without_consent_error = consent_config.get(
+            "block_events_error",
+        )
 
     def default_config(self, **kwargs):
         return DEFAULT_CONFIG
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index fb6bd3b421..1dea2ad024 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -38,6 +38,7 @@ from .spam_checker import SpamCheckerConfig
 from .groups import GroupsConfig
 from .user_directory import UserDirectoryConfig
 from .consent_config import ConsentConfig
+from .server_notices_config import ServerNoticesConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
@@ -47,7 +48,9 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        JWTConfig, PasswordConfig, EmailConfig,
                        WorkerConfig, PasswordAuthProviderConfig, PushConfig,
                        SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,
-                       ConsentConfig):
+                       ConsentConfig,
+                       ServerNoticesConfig,
+                       ):
     pass
 
 
diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices_config.py
new file mode 100644
index 0000000000..ccef8d2ec5
--- /dev/null
+++ b/synapse/config/server_notices_config.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ._base import Config
+from synapse.types import UserID
+
+DEFAULT_CONFIG = """\
+# Server Notices room configuration
+#
+# Uncomment this section to enable a room which can be used to send notices
+# from the server to users. It is a special room which cannot be left; notices
+# come from a special "notices" user id.
+#
+# If you uncomment this section, you *must* define the system_mxid_localpart
+# setting, which defines the id of the user which will be used to send the
+# notices.
+#
+# It's also possible to override the room name, or the display name of the
+# "notices" user.
+#
+# server_notices:
+#   system_mxid_localpart: notices
+#   system_mxid_display_name: "Server Notices"
+#   room_name: "Server Notices"
+"""
+
+
+class ServerNoticesConfig(Config):
+    """Configuration for the server notices room.
+
+    Attributes:
+        server_notices_mxid (str|None):
+            The MXID to use for server notices.
+            None if server notices are not enabled.
+
+        server_notices_mxid_display_name (str|None):
+            The display name to use for the server notices user.
+            None if server notices are not enabled.
+
+        server_notices_room_name (str|None):
+            The name to use for the server notices room.
+            None if server notices are not enabled.
+    """
+    def __init__(self):
+        super(ServerNoticesConfig, self).__init__()
+        self.server_notices_mxid = None
+        self.server_notices_mxid_display_name = None
+        self.server_notices_room_name = None
+
+    def read_config(self, config):
+        c = config.get("server_notices")
+        if c is None:
+            return
+
+        mxid_localpart = c['system_mxid_localpart']
+        self.server_notices_mxid = UserID(
+            mxid_localpart, self.server_name,
+        ).to_string()
+        self.server_notices_mxid_display_name = c.get(
+            'system_mxid_display_name', 'Server Notices',
+        )
+        # todo: i18n
+        self.server_notices_room_name = c.get('room_name', "Server Notices")
+
+    def default_config(self, **kwargs):
+        return DEFAULT_CONFIG
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8f8fd82eb0..d358842b3e 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -14,9 +14,7 @@
 # limitations under the License.
 
 from .register import RegistrationHandler
-from .room import (
-    RoomCreationHandler, RoomContextHandler,
-)
+from .room import RoomContextHandler
 from .message import MessageHandler
 from .federation import FederationHandler
 from .directory import DirectoryHandler
@@ -47,7 +45,6 @@ class Handlers(object):
     def __init__(self, hs):
         self.registration_handler = RegistrationHandler(hs)
         self.message_handler = MessageHandler(hs)
-        self.room_creation_handler = RoomCreationHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 4eb18775e8..d58ea6c650 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -61,7 +61,7 @@ class DeactivateAccountHandler(BaseHandler):
         yield self.store.user_delete_threepids(user_id)
         yield self.store.user_set_password_hash(user_id, None)
 
-        # Add the user to a table of users penpding deactivation (ie.
+        # Add the user to a table of users pending deactivation (ie.
         # removal from all the rooms they're a member of)
         yield self.store.add_user_pending_deactivation(user_id)
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d3685fb12a..8bc642675f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler):
 
         self.notifier = hs.get_notifier()
         self.state = hs.get_state_handler()
+        self._server_notices_sender = hs.get_server_notices_sender()
 
     @defer.inlineCallbacks
     @log_function
@@ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler):
 
         If `only_keys` is not None, events from keys will be sent down.
         """
+
+        # send any outstanding server notices to the user.
+        yield self._server_notices_sender.on_user_syncing(auth_user_id)
+
         auth_user = UserID.from_string(auth_user_id)
         presence_handler = self.hs.get_presence_handler()
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f39233d846..ba3ede8024 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -81,6 +81,7 @@ class FederationHandler(BaseHandler):
         self.pusher_pool = hs.get_pusherpool()
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self._server_notices_mxid = hs.config.server_notices_mxid
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -1180,6 +1181,13 @@ class FederationHandler(BaseHandler):
         if not self.is_mine_id(event.state_key):
             raise SynapseError(400, "The invite event must be for this server")
 
+        # block any attempts to invite the server notices mxid
+        if event.state_key == self._server_notices_mxid:
+            raise SynapseError(
+                http_client.FORBIDDEN,
+                "Cannot invite this user",
+            )
+
         event.internal_metadata.outlier = True
         event.internal_metadata.invite_from_remote = True
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b793fc4df7..c3adbc6c95 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,10 +20,15 @@ import sys
 from canonicaljson import encode_canonical_json
 import six
 from twisted.internet import defer, reactor
+from twisted.internet.defer import succeed
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.errors import (
+    AuthError, Codes, SynapseError,
+    ConsentNotGivenError,
+)
+from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -86,14 +91,14 @@ class MessageHandler(BaseHandler):
         # map from purge id to PurgeStatus
         self._purges_by_id = {}
 
-    def start_purge_history(self, room_id, topological_ordering,
+    def start_purge_history(self, room_id, token,
                             delete_local_events=False):
         """Start off a history purge on a room.
 
         Args:
             room_id (str): The room to purge from
 
-            topological_ordering (int): minimum topo ordering to preserve
+            token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
 
@@ -115,19 +120,19 @@ class MessageHandler(BaseHandler):
         self._purges_by_id[purge_id] = PurgeStatus()
         run_in_background(
             self._purge_history,
-            purge_id, room_id, topological_ordering, delete_local_events,
+            purge_id, room_id, token, delete_local_events,
         )
         return purge_id
 
     @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, topological_ordering,
+    def _purge_history(self, purge_id, room_id, token,
                        delete_local_events):
         """Carry out a history purge on a room.
 
         Args:
             purge_id (str): The id for this purge
             room_id (str): The room to purge from
-            topological_ordering (int): minimum topo ordering to preserve
+            token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
 
@@ -138,7 +143,7 @@ class MessageHandler(BaseHandler):
         try:
             with (yield self.pagination_lock.write(room_id)):
                 yield self.store.purge_history(
-                    room_id, topological_ordering, delete_local_events,
+                    room_id, token, delete_local_events,
                 )
             logger.info("[purge] complete")
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
@@ -431,6 +436,9 @@ class EventCreationHandler(object):
 
         self.spam_checker = hs.get_spam_checker()
 
+        if self.config.block_events_without_consent_error is not None:
+            self._consent_uri_builder = ConsentURIBuilder(self.config)
+
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
                      prev_events_and_hashes=None):
@@ -482,6 +490,10 @@ class EventCreationHandler(object):
                         target, e
                     )
 
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        if not is_exempt:
+            yield self.assert_accepted_privacy_policy(requester)
+
         if token_id is not None:
             builder.internal_metadata.token_id = token_id
 
@@ -496,6 +508,78 @@ class EventCreationHandler(object):
 
         defer.returnValue((event, context))
 
+    def _is_exempt_from_privacy_policy(self, builder):
+        """"Determine if an event to be sent is exempt from having to consent
+        to the privacy policy
+
+        Args:
+            builder (synapse.events.builder.EventBuilder): event being created
+
+        Returns:
+            Deferred[bool]: true if the event can be sent without the user
+                consenting
+        """
+        # the only thing the user can do is join the server notices room.
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            if membership == Membership.JOIN:
+                return self._is_server_notices_room(builder.room_id)
+        return succeed(False)
+
+    @defer.inlineCallbacks
+    def _is_server_notices_room(self, room_id):
+        if self.config.server_notices_mxid is None:
+            defer.returnValue(False)
+        user_ids = yield self.store.get_users_in_room(room_id)
+        defer.returnValue(self.config.server_notices_mxid in user_ids)
+
+    @defer.inlineCallbacks
+    def assert_accepted_privacy_policy(self, requester):
+        """Check if a user has accepted the privacy policy
+
+        Called when the given user is about to do something that requires
+        privacy consent. We see if the user is exempt and otherwise check that
+        they have given consent. If they have not, a ConsentNotGiven error is
+        raised.
+
+        Args:
+            requester (synapse.types.Requester):
+                The user making the request
+
+        Returns:
+            Deferred[None]: returns normally if the user has consented or is
+                exempt
+
+        Raises:
+            ConsentNotGivenError: if the user has not given consent yet
+        """
+        if self.config.block_events_without_consent_error is None:
+            return
+
+        # exempt AS users from needing consent
+        if requester.app_service is not None:
+            return
+
+        user_id = requester.user.to_string()
+
+        # exempt the system notices user
+        if (
+            self.config.server_notices_mxid is not None and
+            user_id == self.config.server_notices_mxid
+        ):
+            return
+
+        u = yield self.store.get_user_by_id(user_id)
+        assert u is not None
+        if u["consent_version"] == self.config.user_consent_version:
+            return
+
+        consent_uri = self._consent_uri_builder.build_user_consent_uri(user_id)
+        raise ConsentNotGivenError(
+            msg=self.config.block_events_without_consent_error,
+            consent_uri=consent_uri,
+        )
+
     @defer.inlineCallbacks
     def send_nonmember_event(self, requester, event, context, ratelimit=True):
         """
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 585f3e4da2..500a131874 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -87,6 +87,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 class PresenceHandler(object):
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
@@ -94,7 +99,6 @@ class PresenceHandler(object):
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
         self.federation = hs.get_federation_sender()
-
         self.state = hs.get_state_handler()
 
         federation_registry = hs.get_federation_registry()
@@ -464,61 +468,6 @@ class PresenceHandler(object):
         return syncing_user_ids
 
     @defer.inlineCallbacks
-    def update_external_syncs(self, process_id, syncing_user_ids):
-        """Update the syncing users for an external process
-
-        Args:
-            process_id(str): An identifier for the process the users are
-                syncing against. This allows synapse to process updates
-                as user start and stop syncing against a given process.
-            syncing_user_ids(set(str)): The set of user_ids that are
-                currently syncing on that server.
-        """
-
-        # Grab the previous list of user_ids that were syncing on that process
-        prev_syncing_user_ids = (
-            self.external_process_to_current_syncs.get(process_id, set())
-        )
-        # Grab the current presence state for both the users that are syncing
-        # now and the users that were syncing before this update.
-        prev_states = yield self.current_state_for_users(
-            syncing_user_ids | prev_syncing_user_ids
-        )
-        updates = []
-        time_now_ms = self.clock.time_msec()
-
-        # For each new user that is syncing check if we need to mark them as
-        # being online.
-        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
-            prev_state = prev_states[new_user_id]
-            if prev_state.state == PresenceState.OFFLINE:
-                updates.append(prev_state.copy_and_replace(
-                    state=PresenceState.ONLINE,
-                    last_active_ts=time_now_ms,
-                    last_user_sync_ts=time_now_ms,
-                ))
-            else:
-                updates.append(prev_state.copy_and_replace(
-                    last_user_sync_ts=time_now_ms,
-                ))
-
-        # For each user that is still syncing or stopped syncing update the
-        # last sync time so that we will correctly apply the grace period when
-        # they stop syncing.
-        for old_user_id in prev_syncing_user_ids:
-            prev_state = prev_states[old_user_id]
-            updates.append(prev_state.copy_and_replace(
-                last_user_sync_ts=time_now_ms,
-            ))
-
-        yield self._update_states(updates)
-
-        # Update the last updated time for the process. We expire the entries
-        # if we don't receive an update in the given timeframe.
-        self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
-        self.external_process_to_current_syncs[process_id] = syncing_user_ids
-
-    @defer.inlineCallbacks
     def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
         """Update the syncing users for an external process as a delta.
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f83c6b3cf8..7e52adda3c 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -34,6 +34,11 @@ logger = logging.getLogger(__name__)
 class RegistrationHandler(BaseHandler):
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         super(RegistrationHandler, self).__init__(hs)
 
         self.auth = hs.get_auth()
@@ -49,6 +54,7 @@ class RegistrationHandler(BaseHandler):
         self._generate_user_id_linearizer = Linearizer(
             name="_generate_user_id_linearizer",
         )
+        self._server_notices_mxid = hs.config.server_notices_mxid
 
     @defer.inlineCallbacks
     def check_username(self, localpart, guest_access_token=None,
@@ -338,6 +344,14 @@ class RegistrationHandler(BaseHandler):
             yield identity_handler.bind_threepid(c, user_id)
 
     def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
+        # don't allow people to register the server notices mxid
+        if self._server_notices_mxid is not None:
+            if user_id == self._server_notices_mxid:
+                raise SynapseError(
+                    400, "This user ID is reserved.",
+                    errcode=Codes.EXCLUSIVE
+                )
+
         # valid user IDs must not clash with any user ID namespaces claimed by
         # application services.
         services = self.store.get_app_services()
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 8df8fcbbad..b5850db42f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -68,14 +68,27 @@ class RoomCreationHandler(BaseHandler):
         self.event_creation_handler = hs.get_event_creation_handler()
 
     @defer.inlineCallbacks
-    def create_room(self, requester, config, ratelimit=True):
+    def create_room(self, requester, config, ratelimit=True,
+                    creator_join_profile=None):
         """ Creates a new room.
 
         Args:
-            requester (Requester): The user who requested the room creation.
+            requester (synapse.types.Requester):
+                The user who requested the room creation.
             config (dict) : A dict of configuration options.
+            ratelimit (bool): set to False to disable the rate limiter
+
+            creator_join_profile (dict|None):
+                Set to override the displayname and avatar for the creating
+                user in this room. If unset, displayname and avatar will be
+                derived from the user's profile. If set, should contain the
+                values to go in the body of the 'join' event (typically
+                `avatar_url` and/or `displayname`.
+
         Returns:
-            The new room ID.
+            Deferred[dict]:
+                a dict containing the keys `room_id` and, if an alias was
+                requested, `room_alias`.
         Raises:
             SynapseError if the room ID couldn't be stored, or something went
             horribly wrong.
@@ -113,6 +126,10 @@ class RoomCreationHandler(BaseHandler):
             except Exception:
                 raise SynapseError(400, "Invalid user_id: %s" % (i,))
 
+        yield self.event_creation_handler.assert_accepted_privacy_policy(
+            requester,
+        )
+
         invite_3pid_list = config.get("invite_3pid", [])
 
         visibility = config.get("visibility", None)
@@ -176,7 +193,8 @@ class RoomCreationHandler(BaseHandler):
             initial_state=initial_state,
             creation_content=creation_content,
             room_alias=room_alias,
-            power_level_content_override=config.get("power_level_content_override", {})
+            power_level_content_override=config.get("power_level_content_override", {}),
+            creator_join_profile=creator_join_profile,
         )
 
         if "name" in config:
@@ -256,6 +274,7 @@ class RoomCreationHandler(BaseHandler):
             creation_content,
             room_alias,
             power_level_content_override,
+            creator_join_profile,
     ):
         def create(etype, content, **kwargs):
             e = {
@@ -299,6 +318,7 @@ class RoomCreationHandler(BaseHandler):
             room_id,
             "join",
             ratelimit=False,
+            content=creator_join_profile,
         )
 
         # We treat the power levels override specially as this needs to be one
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 714583f1d5..82adfc8fdf 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,11 +17,14 @@
 import abc
 import logging
 
+from six.moves import http_client
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
+import synapse.server
 import synapse.types
 from synapse.api.constants import (
     EventTypes, Membership,
@@ -46,6 +49,11 @@ class RoomMemberHandler(object):
     __metaclass__ = abc.ABCMeta
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         self.hs = hs
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
@@ -63,6 +71,7 @@ class RoomMemberHandler(object):
 
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
+        self._server_notices_mxid = self.config.server_notices_mxid
 
     @abc.abstractmethod
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -289,12 +298,37 @@ class RoomMemberHandler(object):
             is_blocked = yield self.store.is_room_blocked(room_id)
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
+        else:
+            # we don't allow people to reject invites to, or leave, the
+            # server notice room.
+            is_blocked = yield self._is_server_notice_room(room_id)
+            if is_blocked:
+                raise SynapseError(
+                    http_client.FORBIDDEN,
+                    "You cannot leave this room",
+                    errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
+                )
+
+        if effective_membership_state == Membership.INVITE:
+            # block any attempts to invite the server notices mxid
+            if target.to_string() == self._server_notices_mxid:
+                raise SynapseError(
+                    http_client.FORBIDDEN,
+                    "Cannot invite this user",
+                )
 
-        if effective_membership_state == "invite":
             block_invite = False
-            is_requester_admin = yield self.auth.is_server_admin(
-                requester.user,
-            )
+
+            if (self._server_notices_mxid is not None and
+                    requester.user.to_string() == self._server_notices_mxid):
+                # allow the server notices mxid to send invites
+                is_requester_admin = True
+
+            else:
+                is_requester_admin = yield self.auth.is_server_admin(
+                    requester.user,
+                )
+
             if not is_requester_admin:
                 if self.config.block_non_admin_invites:
                     logger.info(
@@ -844,6 +878,13 @@ class RoomMemberHandler(object):
 
         defer.returnValue(False)
 
+    @defer.inlineCallbacks
+    def _is_server_notice_room(self, room_id):
+        if self._server_notices_mxid is None:
+            defer.returnValue(False)
+        user_ids = yield self.store.get_users_in_room(room_id)
+        defer.returnValue(self._server_notices_mxid in user_ids)
+
 
 class RoomMemberMasterHandler(RoomMemberHandler):
     def __init__(self, hs):
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 8c850bf23f..5917c83958 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -98,14 +98,87 @@ response_size = metrics.register_counter(
     "response_size", labels=["method", "servlet", "tag"]
 )
 
+# In flight metrics are incremented while the requests are in flight, rather
+# than when the response was written.
+
+in_flight_requests_ru_utime = metrics.register_counter(
+    "in_flight_requests_ru_utime_seconds", labels=["method", "servlet"],
+)
+
+in_flight_requests_ru_stime = metrics.register_counter(
+    "in_flight_requests_ru_stime_seconds", labels=["method", "servlet"],
+)
+
+in_flight_requests_db_txn_count = metrics.register_counter(
+    "in_flight_requests_db_txn_count", labels=["method", "servlet"],
+)
+
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
+in_flight_requests_db_txn_duration = metrics.register_counter(
+    "in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"],
+)
+
+# seconds spent waiting for a db connection, when processing this request
+in_flight_requests_db_sched_duration = metrics.register_counter(
+    "in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"]
+)
+
+
+# The set of all in flight requests, set[RequestMetrics]
+_in_flight_requests = set()
+
+
+def _collect_in_flight():
+    """Called just before metrics are collected, so we use it to update all
+    the in flight request metrics
+    """
+
+    for rm in _in_flight_requests:
+        rm.update_metrics()
+
+
+metrics.register_collector(_collect_in_flight)
+
+
+def _get_in_flight_counts():
+    """Returns a count of all in flight requests by (method, server_name)
+
+    Returns:
+        dict[tuple[str, str], int]
+    """
+
+    # Map from (method, name) -> int, the number of in flight requests of that
+    # type
+    counts = {}
+    for rm in _in_flight_requests:
+        key = (rm.method, rm.name,)
+        counts[key] = counts.get(key, 0) + 1
+
+    return counts
+
+
+metrics.register_callback(
+    "in_flight_requests_count",
+    _get_in_flight_counts,
+    labels=["method", "servlet"]
+)
+
 
 class RequestMetrics(object):
-    def start(self, time_msec, name):
+    def start(self, time_msec, name, method):
         self.start = time_msec
         self.start_context = LoggingContext.current_context()
         self.name = name
+        self.method = method
+
+        self._request_stats = _RequestStats.from_context(self.start_context)
+
+        _in_flight_requests.add(self)
 
     def stop(self, time_msec, request):
+        _in_flight_requests.discard(self)
+
         context = LoggingContext.current_context()
 
         tag = ""
@@ -147,3 +220,88 @@ class RequestMetrics(object):
         )
 
         response_size.inc_by(request.sentLength, request.method, self.name, tag)
+
+        # We always call this at the end to ensure that we update the metrics
+        # regardless of whether a call to /metrics while the request was in
+        # flight.
+        self.update_metrics()
+
+    def update_metrics(self):
+        """Updates the in flight metrics with values from this request.
+        """
+
+        diff = self._request_stats.update(self.start_context)
+
+        in_flight_requests_ru_utime.inc_by(
+            diff.ru_utime, self.method, self.name,
+        )
+
+        in_flight_requests_ru_stime.inc_by(
+            diff.ru_stime, self.method, self.name,
+        )
+
+        in_flight_requests_db_txn_count.inc_by(
+            diff.db_txn_count, self.method, self.name,
+        )
+
+        in_flight_requests_db_txn_duration.inc_by(
+            diff.db_txn_duration_ms / 1000., self.method, self.name,
+        )
+
+        in_flight_requests_db_sched_duration.inc_by(
+            diff.db_sched_duration_ms / 1000., self.method, self.name,
+        )
+
+
+class _RequestStats(object):
+    """Keeps tracks of various metrics for an in flight request.
+    """
+
+    __slots__ = [
+        "ru_utime", "ru_stime",
+        "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+    ]
+
+    def __init__(self, ru_utime, ru_stime, db_txn_count,
+                 db_txn_duration_ms, db_sched_duration_ms):
+        self.ru_utime = ru_utime
+        self.ru_stime = ru_stime
+        self.db_txn_count = db_txn_count
+        self.db_txn_duration_ms = db_txn_duration_ms
+        self.db_sched_duration_ms = db_sched_duration_ms
+
+    @staticmethod
+    def from_context(context):
+        ru_utime, ru_stime = context.get_resource_usage()
+
+        return _RequestStats(
+            ru_utime, ru_stime,
+            context.db_txn_count,
+            context.db_txn_duration_ms,
+            context.db_sched_duration_ms,
+        )
+
+    def update(self, context):
+        """Updates the current values and returns the difference between the
+        old and new values.
+
+        Returns:
+            _RequestStats: The difference between the old and new values
+        """
+        new = _RequestStats.from_context(context)
+
+        diff = _RequestStats(
+            new.ru_utime - self.ru_utime,
+            new.ru_stime - self.ru_stime,
+            new.db_txn_count - self.db_txn_count,
+            new.db_txn_duration_ms - self.db_txn_duration_ms,
+            new.db_sched_duration_ms - self.db_sched_duration_ms,
+        )
+
+        self.ru_utime = new.ru_utime
+        self.ru_stime = new.ru_stime
+        self.db_txn_count = new.db_txn_count
+        self.db_txn_duration_ms = new.db_txn_duration_ms
+        self.db_sched_duration_ms = new.db_sched_duration_ms
+
+        return diff
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 202a990508..b608504225 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -85,7 +85,9 @@ class SynapseRequest(Request):
     def _started_processing(self, servlet_name):
         self.start_time = int(time.time() * 1000)
         self.request_metrics = RequestMetrics()
-        self.request_metrics.start(self.start_time, name=servlet_name)
+        self.request_metrics.start(
+            self.start_time, name=servlet_name, method=self.method,
+        )
 
         self.site.access_logger.info(
             "%s - %s - Received request: %s %s",
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index a41af4fd6c..a603c520ea 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -69,6 +69,7 @@ class ReplicationStreamer(object):
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
+        self._server_notices_sender = hs.get_server_notices_sender()
 
         # Current connections.
         self.connections = []
@@ -253,6 +254,7 @@ class ReplicationStreamer(object):
         yield self.store.insert_client_ip(
             user_id, access_token, ip, user_agent, device_id, last_seen,
         )
+        yield self._server_notices_sender.on_user_ip(user_id)
 
     def send_sync_to_all_connections(self, data):
         """Sends a SYNC command to all clients.
diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py
index fceca2edeb..20fa6678ef 100644
--- a/synapse/rest/client/transactions.py
+++ b/synapse/rest/client/transactions.py
@@ -19,6 +19,7 @@ import logging
 
 from synapse.api.auth import get_access_token_from_request
 from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -80,27 +81,26 @@ class HttpTransactionCache(object):
         Returns:
             Deferred which resolves to a tuple of (response_code, response_dict).
         """
-        try:
-            return self.transactions[txn_key][0].observe()
-        except (KeyError, IndexError):
-            pass  # execute the function instead.
-
-        deferred = fn(*args, **kwargs)
-
-        # if the request fails with a Twisted failure, remove it
-        # from the transaction map. This is done to ensure that we don't
-        # cache transient errors like rate-limiting errors, etc.
-        def remove_from_map(err):
-            self.transactions.pop(txn_key, None)
-            return err
-        deferred.addErrback(remove_from_map)
-
-        # We don't add any other errbacks to the raw deferred, so we ask
-        # ObservableDeferred to swallow the error. This is fine as the error will
-        # still be reported to the observers.
-        observable = ObservableDeferred(deferred, consumeErrors=True)
-        self.transactions[txn_key] = (observable, self.clock.time_msec())
-        return observable.observe()
+        if txn_key in self.transactions:
+            observable = self.transactions[txn_key][0]
+        else:
+            # execute the function instead.
+            deferred = run_in_background(fn, *args, **kwargs)
+
+            observable = ObservableDeferred(deferred)
+            self.transactions[txn_key] = (observable, self.clock.time_msec())
+
+            # if the request fails with an exception, remove it
+            # from the transaction map. This is done to ensure that we don't
+            # cache transient errors like rate-limiting errors, etc.
+            def remove_from_map(err):
+                self.transactions.pop(txn_key, None)
+                # we deliberately do not propagate the error any further, as we
+                # expect the observers to have reported it.
+
+            deferred.addErrback(remove_from_map)
+
+        return make_deferred_yieldable(observable.observe())
 
     def _cleanup(self):
         now = self.clock.time_msec()
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index efd5c9873d..6835a7bba2 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -151,10 +151,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
             if event.room_id != room_id:
                 raise SynapseError(400, "Event is for wrong room.")
 
-            depth = event.depth
+            token = yield self.store.get_topological_token_for_event(event_id)
+
             logger.info(
-                "[purge] purging up to depth %i (event_id %s)",
-                depth, event_id,
+                "[purge] purging up to token %s (event_id %s)",
+                token, event_id,
             )
         elif 'purge_up_to_ts' in body:
             ts = body['purge_up_to_ts']
@@ -174,7 +175,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 )
             )
             if room_event_after_stream_ordering:
-                (_, depth, _) = room_event_after_stream_ordering
+                token = yield self.store.get_topological_token_for_event(
+                    room_event_after_stream_ordering,
+                )
             else:
                 logger.warn(
                     "[purge] purging events not possible: No event found "
@@ -187,9 +190,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                     errcode=Codes.NOT_FOUND,
                 )
             logger.info(
-                "[purge] purging up to depth %i (received_ts %i => "
+                "[purge] purging up to token %d (received_ts %i => "
                 "stream_ordering %i)",
-                depth, ts, stream_ordering,
+                token, ts, stream_ordering,
             )
         else:
             raise SynapseError(
@@ -199,7 +202,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
             )
 
         purge_id = yield self.handlers.message_handler.start_purge_history(
-            room_id, depth,
+            room_id, token,
             delete_local_events=delete_local_events,
         )
 
@@ -273,8 +276,8 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(ShutdownRoomRestServlet, self).__init__(hs)
         self.store = hs.get_datastore()
-        self.handlers = hs.get_handlers()
         self.state = hs.get_state_handler()
+        self._room_creation_handler = hs.get_room_creation_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
 
@@ -296,7 +299,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
         message = content.get("message", self.DEFAULT_MESSAGE)
         room_name = content.get("room_name", "Content Violation Notification")
 
-        info = yield self.handlers.room_creation_handler.create_room(
+        info = yield self._room_creation_handler.create_room(
             room_creator_requester,
             config={
                 "preset": "public_chat",
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index fcf9c9ab44..0b984987ed 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -41,7 +41,7 @@ class RoomCreateRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomCreateRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self._room_creation_handler = hs.get_room_creation_handler()
 
     def register(self, http_server):
         PATTERNS = "/createRoom"
@@ -64,8 +64,7 @@ class RoomCreateRestServlet(ClientV1RestServlet):
     def on_POST(self, request):
         requester = yield self.auth.get_user_by_req(request)
 
-        handler = self.handlers.room_creation_handler
-        info = yield handler.create_room(
+        info = yield self._room_creation_handler.create_room(
             requester, self.get_room_config(request)
         )
 
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index eb91c0b293..a291cffbf1 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -85,6 +85,7 @@ class SyncRestServlet(RestServlet):
         self.clock = hs.get_clock()
         self.filtering = hs.get_filtering()
         self.presence_handler = hs.get_presence_handler()
+        self._server_notices_sender = hs.get_server_notices_sender()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
@@ -149,6 +150,9 @@ class SyncRestServlet(RestServlet):
         else:
             since_token = None
 
+        # send any outstanding server notices to the user.
+        yield self._server_notices_sender.on_user_syncing(user.to_string())
+
         affect_presence = set_presence != PresenceState.OFFLINE
 
         if affect_presence:
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index d791302278..724911d1e6 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -95,8 +95,8 @@ class ConsentResource(Resource):
         # this is required by the request_handler wrapper
         self.clock = hs.get_clock()
 
-        consent_config = hs.config.consent_config
-        if consent_config is None:
+        self._default_consent_version = hs.config.user_consent_version
+        if self._default_consent_version is None:
             raise ConfigError(
                 "Consent resource is enabled but user_consent section is "
                 "missing in config file.",
@@ -104,7 +104,7 @@ class ConsentResource(Resource):
 
         # daemonize changes the cwd to /, so make the path absolute now.
         consent_template_directory = path.abspath(
-            consent_config["template_dir"],
+            hs.config.user_consent_template_dir,
         )
         if not path.isdir(consent_template_directory):
             raise ConfigError(
@@ -114,9 +114,10 @@ class ConsentResource(Resource):
             )
 
         loader = jinja2.FileSystemLoader(consent_template_directory)
-        self._jinja_env = jinja2.Environment(loader=loader)
-
-        self._default_consent_verison = consent_config["default_version"]
+        self._jinja_env = jinja2.Environment(
+            loader=loader,
+            autoescape=jinja2.select_autoescape(['html', 'htm', 'xml']),
+        )
 
         if hs.config.form_secret is None:
             raise ConfigError(
@@ -131,6 +132,7 @@ class ConsentResource(Resource):
         return NOT_DONE_YET
 
     @wrap_html_request_handler
+    @defer.inlineCallbacks
     def _async_render_GET(self, request):
         """
         Args:
@@ -138,16 +140,26 @@ class ConsentResource(Resource):
         """
 
         version = parse_string(request, "v",
-                               default=self._default_consent_verison)
+                               default=self._default_consent_version)
         username = parse_string(request, "u", required=True)
         userhmac = parse_string(request, "h", required=True)
 
         self._check_hash(username, userhmac)
 
+        if username.startswith('@'):
+            qualified_user_id = username
+        else:
+            qualified_user_id = UserID(username, self.hs.hostname).to_string()
+
+        u = yield self.store.get_user_by_id(qualified_user_id)
+        if u is None:
+            raise NotFoundError("Unknown user")
+
         try:
             self._render_template(
                 request, "%s.html" % (version,),
                 user=username, userhmac=userhmac, version=version,
+                has_consented=(u["consent_version"] == version),
             )
         except TemplateNotFound:
             raise NotFoundError("Unknown policy version")
diff --git a/synapse/server.py b/synapse/server.py
index 21cde5b6fc..58dbf78437 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -46,6 +46,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
+from synapse.handlers.room import RoomCreationHandler
 from synapse.handlers.room_list import RoomListHandler
 from synapse.handlers.room_member import RoomMemberMasterHandler
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@@ -71,6 +72,11 @@ from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
 )
+from synapse.server_notices.server_notices_manager import ServerNoticesManager
+from synapse.server_notices.server_notices_sender import ServerNoticesSender
+from synapse.server_notices.worker_server_notices_sender import (
+    WorkerServerNoticesSender,
+)
 from synapse.state import StateHandler, StateResolutionHandler
 from synapse.storage import DataStore
 from synapse.streams.events import EventSources
@@ -109,6 +115,7 @@ class HomeServer(object):
         'federation_server',
         'handlers',
         'auth',
+        'room_creation_handler',
         'state_handler',
         'state_resolution_handler',
         'presence_handler',
@@ -154,6 +161,8 @@ class HomeServer(object):
         'spam_checker',
         'room_member_handler',
         'federation_registry',
+        'server_notices_manager',
+        'server_notices_sender',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -227,6 +236,9 @@ class HomeServer(object):
     def build_simple_http_client(self):
         return SimpleHttpClient(self)
 
+    def build_room_creation_handler(self):
+        return RoomCreationHandler(self)
+
     def build_state_handler(self):
         return StateHandler(self)
 
@@ -393,6 +405,16 @@ class HomeServer(object):
     def build_federation_registry(self):
         return FederationHandlerRegistry()
 
+    def build_server_notices_manager(self):
+        if self.config.worker_app:
+            raise Exception("Workers cannot send server notices")
+        return ServerNoticesManager(self)
+
+    def build_server_notices_sender(self):
+        if self.config.worker_app:
+            return WorkerServerNoticesSender(self)
+        return ServerNoticesSender(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index c3a9a3847b..ce28486233 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,4 +1,5 @@
 import synapse.api.auth
+import synapse.config.homeserver
 import synapse.federation.transaction_queue
 import synapse.federation.transport.client
 import synapse.handlers
@@ -8,11 +9,17 @@ import synapse.handlers.device
 import synapse.handlers.e2e_keys
 import synapse.handlers.set_password
 import synapse.rest.media.v1.media_repository
+import synapse.server_notices.server_notices_manager
+import synapse.server_notices.server_notices_sender
 import synapse.state
 import synapse.storage
 
 
 class HomeServer(object):
+    @property
+    def config(self) -> synapse.config.homeserver.HomeServerConfig:
+        pass
+
     def get_auth(self) -> synapse.api.auth.Auth:
         pass
 
@@ -40,6 +47,12 @@ class HomeServer(object):
     def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
         pass
 
+    def get_room_creation_handler(self) -> synapse.handlers.room.RoomCreationHandler:
+        pass
+
+    def get_event_creation_handler(self) -> synapse.handlers.message.EventCreationHandler:
+        pass
+
     def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
         pass
 
@@ -54,3 +67,9 @@ class HomeServer(object):
 
     def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
         pass
+
+    def get_server_notices_manager(self) -> synapse.server_notices.server_notices_manager.ServerNoticesManager:
+        pass
+
+    def get_server_notices_sender(self) -> synapse.server_notices.server_notices_sender.ServerNoticesSender:
+        pass
diff --git a/synapse/server_notices/__init__.py b/synapse/server_notices/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/synapse/server_notices/__init__.py
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
new file mode 100644
index 0000000000..440f6b1cd4
--- /dev/null
+++ b/synapse/server_notices/consent_server_notices.py
@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.config import ConfigError
+
+logger = logging.getLogger(__name__)
+
+
+class ConsentServerNotices(object):
+    """Keeps track of whether we need to send users server_notices about
+    privacy policy consent, and sends one if we do.
+    """
+    def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
+        self._server_notices_manager = hs.get_server_notices_manager()
+        self._store = hs.get_datastore()
+
+        self._users_in_progress = set()
+
+        self._current_consent_version = hs.config.user_consent_version
+        self._server_notice_content = hs.config.user_consent_server_notice_content
+
+        if self._server_notice_content is not None:
+            if not self._server_notices_manager.is_enabled():
+                raise ConfigError(
+                    "user_consent configuration requires server notices, but "
+                    "server notices are not enabled.",
+                )
+            if 'body' not in self._server_notice_content:
+                raise ConfigError(
+                    "user_consent server_notice_consent must contain a 'body' "
+                    "key.",
+                )
+
+    @defer.inlineCallbacks
+    def maybe_send_server_notice_to_user(self, user_id):
+        """Check if we need to send a notice to this user, and does so if so
+
+        Args:
+            user_id (str): user to check
+
+        Returns:
+            Deferred
+        """
+        if self._server_notice_content is None:
+            # not enabled
+            return
+
+        # make sure we don't send two messages to the same user at once
+        if user_id in self._users_in_progress:
+            return
+        self._users_in_progress.add(user_id)
+        try:
+            u = yield self._store.get_user_by_id(user_id)
+
+            if u["consent_version"] == self._current_consent_version:
+                # user has already consented
+                return
+
+            if u["consent_server_notice_sent"] == self._current_consent_version:
+                # we've already sent a notice to the user
+                return
+
+            # need to send a message
+            try:
+                yield self._server_notices_manager.send_notice(
+                    user_id, self._server_notice_content,
+                )
+                yield self._store.user_set_consent_server_notice_sent(
+                    user_id, self._current_consent_version,
+                )
+            except SynapseError as e:
+                logger.error("Error sending server notice about user consent: %s", e)
+        finally:
+            self._users_in_progress.remove(user_id)
diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
new file mode 100644
index 0000000000..f535b9c9da
--- /dev/null
+++ b/synapse/server_notices/server_notices_manager.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership, RoomCreationPreset
+from synapse.types import create_requester
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
+logger = logging.getLogger(__name__)
+
+
+class ServerNoticesManager(object):
+    def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
+
+        self._store = hs.get_datastore()
+        self._config = hs.config
+        self._room_creation_handler = hs.get_room_creation_handler()
+        self._event_creation_handler = hs.get_event_creation_handler()
+
+    def is_enabled(self):
+        """Checks if server notices are enabled on this server.
+
+        Returns:
+            bool
+        """
+        return self._config.server_notices_mxid is not None
+
+    @defer.inlineCallbacks
+    def send_notice(self, user_id, event_content):
+        """Send a notice to the given user
+
+        Creates the server notices room, if none exists.
+
+        Args:
+            user_id (str): mxid of user to send event to.
+            event_content (dict): content of event to send
+
+        Returns:
+            Deferrred[None]
+        """
+        room_id = yield self.get_notice_room_for_user(user_id)
+
+        system_mxid = self._config.server_notices_mxid
+        requester = create_requester(system_mxid)
+
+        logger.info("Sending server notice to %s", user_id)
+
+        yield self._event_creation_handler.create_and_send_nonmember_event(
+            requester, {
+                "type": EventTypes.Message,
+                "room_id": room_id,
+                "sender": system_mxid,
+                "content": event_content,
+            },
+            ratelimit=False,
+        )
+
+    @cachedInlineCallbacks()
+    def get_notice_room_for_user(self, user_id):
+        """Get the room for notices for a given user
+
+        If we have not yet created a notice room for this user, create it
+
+        Args:
+            user_id (str): complete user id for the user we want a room for
+
+        Returns:
+            str: room id of notice room.
+        """
+        if not self.is_enabled():
+            raise Exception("Server notices not enabled")
+
+        rooms = yield self._store.get_rooms_for_user_where_membership_is(
+            user_id, [Membership.INVITE, Membership.JOIN],
+        )
+        system_mxid = self._config.server_notices_mxid
+        for room in rooms:
+            # it's worth noting that there is an asymmetry here in that we
+            # expect the user to be invited or joined, but the system user must
+            # be joined. This is kinda deliberate, in that if somebody somehow
+            # manages to invite the system user to a room, that doesn't make it
+            # the server notices room.
+            user_ids = yield self._store.get_users_in_room(room.room_id)
+            if system_mxid in user_ids:
+                # we found a room which our user shares with the system notice
+                # user
+                logger.info("Using room %s", room.room_id)
+                defer.returnValue(room.room_id)
+
+        # apparently no existing notice room: create a new one
+        logger.info("Creating server notices room for %s", user_id)
+
+        requester = create_requester(system_mxid)
+        info = yield self._room_creation_handler.create_room(
+            requester,
+            config={
+                "preset": RoomCreationPreset.PRIVATE_CHAT,
+                "name": self._config.server_notices_room_name,
+                "power_level_content_override": {
+                    "users_default": -10,
+                },
+                "invite": (user_id,)
+            },
+            ratelimit=False,
+            creator_join_profile={
+                "displayname": self._config.server_notices_mxid_display_name,
+            },
+        )
+        room_id = info['room_id']
+
+        logger.info("Created server notices room %s for %s", room_id, user_id)
+        defer.returnValue(room_id)
diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py
new file mode 100644
index 0000000000..5d23965f34
--- /dev/null
+++ b/synapse/server_notices/server_notices_sender.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.server_notices.consent_server_notices import ConsentServerNotices
+
+
+class ServerNoticesSender(object):
+    """A centralised place which sends server notices automatically when
+    Certain Events take place
+    """
+    def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
+        # todo: it would be nice to make this more dynamic
+        self._consent_server_notices = ConsentServerNotices(hs)
+
+    def on_user_syncing(self, user_id):
+        """Called when the user performs a sync operation.
+
+        Args:
+            user_id (str): mxid of user who synced
+
+        Returns:
+            Deferred
+        """
+        return self._consent_server_notices.maybe_send_server_notice_to_user(
+            user_id,
+        )
+
+    def on_user_ip(self, user_id):
+        """Called on the master when a worker process saw a client request.
+
+        Args:
+            user_id (str): mxid
+
+        Returns:
+            Deferred
+        """
+        # The synchrotrons use a stubbed version of ServerNoticesSender, so
+        # we check for notices to send to the user in on_user_ip as well as
+        # in on_user_syncing
+        return self._consent_server_notices.maybe_send_server_notice_to_user(
+            user_id,
+        )
diff --git a/synapse/server_notices/worker_server_notices_sender.py b/synapse/server_notices/worker_server_notices_sender.py
new file mode 100644
index 0000000000..4a133026c3
--- /dev/null
+++ b/synapse/server_notices/worker_server_notices_sender.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+
+class WorkerServerNoticesSender(object):
+    """Stub impl of ServerNoticesSender which does nothing"""
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer):
+        """
+
+    def on_user_syncing(self, user_id):
+        """Called when the user performs a sync operation.
+
+        Args:
+            user_id (str): mxid of user who synced
+
+        Returns:
+            Deferred
+        """
+        return defer.succeed(None)
+
+    def on_user_ip(self, user_id):
+        """Called on the master when a worker process saw a client request.
+
+        Args:
+            user_id (str): mxid
+
+        Returns:
+            Deferred
+        """
+        raise AssertionError("on_user_ip unexpectedly called on worker")
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4551cf8774..979fa22438 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -376,10 +376,14 @@ class DataStore(RoomMemberStore, RoomStore,
                     FROM user_ips AS u
                     LEFT JOIN (
                       SELECT user_id, device_id, timestamp FROM user_daily_visits
-                      WHERE timestamp IS ?
+                      WHERE timestamp = ?
                     ) udv
                     ON u.user_id = udv.user_id AND u.device_id=udv.device_id
-                    WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL
+                    INNER JOIN users ON users.name=u.user_id
+                    WHERE last_seen > ? AND last_seen <= ?
+                    AND udv.timestamp IS NULL AND users.is_guest=0
+                    AND users.appservice_id IS NULL
+                    GROUP BY u.user_id, u.device_id
             """
 
             # This means that the day has rolled over but there could still
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 70b9041eee..3cb1abd2ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,7 +33,7 @@ from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id
+from synapse.types import get_domain_from_id, RoomStreamToken
 import synapse.metrics
 
 # these are only included to make the type annotations work
@@ -1812,15 +1812,14 @@ class EventsStore(EventsWorkerStore):
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
     def purge_history(
-        self, room_id, topological_ordering, delete_local_events,
+        self, room_id, token, delete_local_events,
     ):
         """Deletes room history before a certain point
 
         Args:
             room_id (str):
 
-            topological_ordering (int):
-                minimum topo ordering to preserve
+            token (str): A topological token to delete events before
 
             delete_local_events (bool):
                 if True, we will delete local events as well as remote ones
@@ -1830,13 +1829,15 @@ class EventsStore(EventsWorkerStore):
 
         return self.runInteraction(
             "purge_history",
-            self._purge_history_txn, room_id, topological_ordering,
+            self._purge_history_txn, room_id, token,
             delete_local_events,
         )
 
     def _purge_history_txn(
-        self, txn, room_id, topological_ordering, delete_local_events,
+        self, txn, room_id, token_str, delete_local_events,
     ):
+        token = RoomStreamToken.parse(token_str)
+
         # Tables that should be pruned:
         #     event_auth
         #     event_backward_extremities
@@ -1881,6 +1882,13 @@ class EventsStore(EventsWorkerStore):
             " ON events_to_purge(should_delete)",
         )
 
+        # We do joins against events_to_purge for e.g. calculating state
+        # groups to purge, etc., so lets make an index.
+        txn.execute(
+            "CREATE INDEX events_to_purge_id"
+            " ON events_to_purge(event_id)",
+        )
+
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -1893,7 +1901,7 @@ class EventsStore(EventsWorkerStore):
         rows = txn.fetchall()
         max_depth = max(row[0] for row in rows)
 
-        if max_depth <= topological_ordering:
+        if max_depth <= token.topological:
             # We need to ensure we don't delete all the events from the datanase
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -1909,7 +1917,7 @@ class EventsStore(EventsWorkerStore):
             should_delete_expr += " AND event_id NOT LIKE ?"
             should_delete_params += ("%:" + self.hs.hostname, )
 
-        should_delete_params += (room_id, topological_ordering)
+        should_delete_params += (room_id, token.topological)
 
         txn.execute(
             "INSERT INTO events_to_purge"
@@ -1932,13 +1940,13 @@ class EventsStore(EventsWorkerStore):
         logger.info("[purge] Finding new backward extremities")
 
         # We calculate the new entries for the backward extremeties by finding
-        # all events that point to events that are to be purged
+        # events to be purged that are pointed to by events we're not going to
+        # purge.
         txn.execute(
             "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
             " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
-            " WHERE e2.topological_ordering >= ?",
-            (topological_ordering, )
+            " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+            " WHERE ep2.event_id IS NULL",
         )
         new_backwards_extrems = txn.fetchall()
 
@@ -1962,16 +1970,22 @@ class EventsStore(EventsWorkerStore):
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
-        txn.execute(
-            "SELECT state_group FROM event_to_state_groups"
-            " INNER JOIN events USING (event_id)"
-            " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events_to_purge"
-            "   INNER JOIN event_to_state_groups USING (event_id)"
-            " )"
-            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (topological_ordering, )
-        )
+        # This works by first getting state groups that we may want to delete,
+        # joining against event_to_state_groups to get events that use that
+        # state group, then left joining against events_to_purge again. Any
+        # state group where the left join produce *no nulls* are referenced
+        # only by events that are going to be purged.
+        txn.execute("""
+            SELECT state_group FROM
+            (
+                SELECT DISTINCT state_group FROM events_to_purge
+                INNER JOIN event_to_state_groups USING (event_id)
+            ) AS sp
+            INNER JOIN event_to_state_groups USING (state_group)
+            LEFT JOIN events_to_purge AS ep USING (event_id)
+            GROUP BY state_group
+            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+        """)
 
         state_rows = txn.fetchall()
         logger.info("[purge] found %i redundant state groups", len(state_rows))
@@ -2118,10 +2132,25 @@ class EventsStore(EventsWorkerStore):
         #
         # So, let's stick it at the end so that we don't block event
         # persistence.
-        logger.info("[purge] updating room_depth")
+        #
+        # We do this by calculating the minimum depth of the backwards
+        # extremities. However, the events in event_backward_extremities
+        # are ones we don't have yet so we need to look at the events that
+        # point to it via event_edges table.
+        txn.execute("""
+            SELECT COALESCE(MIN(depth), 0)
+            FROM event_backward_extremities AS eb
+            INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+            INNER JOIN events AS e ON e.event_id = eg.event_id
+            WHERE eb.room_id = ?
+        """, (room_id,))
+        min_depth, = txn.fetchone()
+
+        logger.info("[purge] updating room_depth to %d", min_depth)
+
         txn.execute(
             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
-            (topological_ordering, room_id,)
+            (min_depth, room_id,)
         )
 
         # finally, drop the temp table. this will commit the txn in sqlite,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 2f95e7e82a..709c69a926 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,18 +297,22 @@ class ReceiptsWorkerStore(SQLBaseStore):
         if receipt_type != "m.read":
             return
 
-        # Returns an ObservableDeferred
+        # Returns either an ObservableDeferred or the raw result
         res = self.get_users_with_read_receipts_in_room.cache.get(
             room_id, None, update_metrics=False,
         )
 
-        if res:
-            if isinstance(res, defer.Deferred) and res.called:
+        # first handle the Deferred case
+        if isinstance(res, defer.Deferred):
+            if res.called:
                 res = res.result
-            if user_id in res:
-                # We'd only be adding to the set, so no point invalidating if the
-                # user is already there
-                return
+            else:
+                res = None
+
+        if res and user_id in res:
+            # We'd only be adding to the set, so no point invalidating if the
+            # user is already there
+            return
 
         self.get_users_with_read_receipts_in_room.invalidate((room_id,))
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8d1a01f1ee..a530e29f43 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -33,7 +33,10 @@ class RegistrationWorkerStore(SQLBaseStore):
             keyvalues={
                 "name": user_id,
             },
-            retcols=["name", "password_hash", "is_guest"],
+            retcols=[
+                "name", "password_hash", "is_guest",
+                "consent_version", "consent_server_notice_sent",
+            ],
             allow_none=True,
             desc="get_user_by_id",
         )
@@ -297,12 +300,41 @@ class RegistrationStore(RegistrationWorkerStore,
         Raises:
             StoreError(404) if user not found
         """
-        return self._simple_update_one(
-            table='users',
-            keyvalues={'name': user_id, },
-            updatevalues={'consent_version': consent_version, },
-            desc="user_set_consent_version"
-        )
+        def f(txn):
+            self._simple_update_one_txn(
+                txn,
+                table='users',
+                keyvalues={'name': user_id, },
+                updatevalues={'consent_version': consent_version, },
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_user_by_id, (user_id,)
+            )
+        return self.runInteraction("user_set_consent_version", f)
+
+    def user_set_consent_server_notice_sent(self, user_id, consent_version):
+        """Updates the user table to record that we have sent the user a server
+        notice about privacy policy consent
+
+        Args:
+            user_id (str): full mxid of the user to update
+            consent_version (str): version of the policy we have notified the
+                user about
+
+        Raises:
+            StoreError(404) if user not found
+        """
+        def f(txn):
+            self._simple_update_one_txn(
+                txn,
+                table='users',
+                keyvalues={'name': user_id, },
+                updatevalues={'consent_server_notice_sent': consent_version, },
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_user_by_id, (user_id,)
+            )
+        return self.runInteraction("user_set_consent_server_notice_sent", f)
 
     def user_delete_access_tokens(self, user_id, except_token_id=None,
                                   device_id=None):
diff --git a/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql
new file mode 100644
index 0000000000..14dcf18d73
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql
@@ -0,0 +1,20 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* record whether we have sent a server notice about consenting to the
+ * privacy policy. Specifically records the version of the policy we sent
+ * a message about.
+ */
+ALTER TABLE users ADD COLUMN consent_server_notice_sent TEXT;
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index ea24710ad8..fb463c525a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -684,8 +684,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 results to only those before
             direction(char): Either 'b' or 'f' to indicate whether we are
                 paginating forwards or backwards from `from_key`.
-            limit (int): The maximum number of events to return. Zero or less
-                means no limit.
+            limit (int): The maximum number of events to return.
             event_filter (Filter|None): If provided filters the events to
                 those that match the filter.
 
@@ -694,6 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             as a list of _EventDictReturn and a token that points to the end
             of the result set.
         """
+
+        assert int(limit) >= 0
+
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -723,22 +725,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             bounds += " AND " + filter_clause
             args.extend(filter_args)
 
-        if int(limit) > 0:
-            args.append(int(limit))
-            limit_str = " LIMIT ?"
-        else:
-            limit_str = ""
+        args.append(int(limit))
 
         sql = (
             "SELECT event_id, topological_ordering, stream_ordering"
             " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s %(limit)s"
+            " stream_ordering %(order)s LIMIT ?"
         ) % {
             "bounds": bounds,
             "order": order,
-            "limit": limit_str
         }
 
         txn.execute(sql, args)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index eab9d57650..914f616312 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -60,7 +60,7 @@ class LoggingContext(object):
     __slots__ = [
         "previous_context", "name", "ru_stime", "ru_utime",
         "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
-        "usage_start", "usage_end",
+        "usage_start",
         "main_thread", "alive",
         "request", "tag",
     ]
@@ -109,8 +109,10 @@ class LoggingContext(object):
         # ms spent waiting for db txns to be scheduled
         self.db_sched_duration_ms = 0
 
+        # If alive has the thread resource usage when the logcontext last
+        # became active.
         self.usage_start = None
-        self.usage_end = None
+
         self.main_thread = threading.current_thread()
         self.request = None
         self.tag = ""
@@ -159,7 +161,7 @@ class LoggingContext(object):
         """Restore the logging context in thread local storage to the state it
         was before this context was entered.
         Returns:
-            None to avoid suppressing any exeptions that were thrown.
+            None to avoid suppressing any exceptions that were thrown.
         """
         current = self.set_current_context(self.previous_context)
         if current is not self:
@@ -185,29 +187,43 @@ class LoggingContext(object):
 
     def start(self):
         if threading.current_thread() is not self.main_thread:
+            logger.warning("Started logcontext %s on different thread", self)
             return
 
-        if self.usage_start and self.usage_end:
-            self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime
-            self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime
-            self.usage_start = None
-            self.usage_end = None
-
+        # If we haven't already started record the thread resource usage so
+        # far
         if not self.usage_start:
             self.usage_start = get_thread_resource_usage()
 
     def stop(self):
         if threading.current_thread() is not self.main_thread:
+            logger.warning("Stopped logcontext %s on different thread", self)
             return
 
+        # When we stop, let's record the resource used since we started
         if self.usage_start:
-            self.usage_end = get_thread_resource_usage()
+            usage_end = get_thread_resource_usage()
+
+            self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
+            self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
+
+            self.usage_start = None
+        else:
+            logger.warning("Called stop on logcontext %s without calling start", self)
 
     def get_resource_usage(self):
+        """Get CPU time used by this logcontext so far.
+
+        Returns:
+            tuple[float, float]: The user and system CPU usage in seconds
+        """
         ru_utime = self.ru_utime
         ru_stime = self.ru_stime
 
-        if self.usage_start and threading.current_thread() is self.main_thread:
+        # If we are on the correct thread and we're currently running then we
+        # can include resource usage so far.
+        is_main_thread = threading.current_thread() is self.main_thread
+        if self.alive and self.usage_start and is_main_thread:
             current = get_thread_resource_usage()
             ru_utime += current.ru_utime - self.usage_start.ru_utime
             ru_stime += current.ru_stime - self.usage_start.ru_stime
diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py
index d7cea30260..b5bc2fa255 100644
--- a/tests/rest/client/test_transactions.py
+++ b/tests/rest/client/test_transactions.py
@@ -2,6 +2,9 @@ from synapse.rest.client.transactions import HttpTransactionCache
 from synapse.rest.client.transactions import CLEANUP_PERIOD_MS
 from twisted.internet import defer
 from mock import Mock, call
+
+from synapse.util import async
+from synapse.util.logcontext import LoggingContext
 from tests import unittest
 from tests.utils import MockClock
 
@@ -40,6 +43,78 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
         cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0)
 
     @defer.inlineCallbacks
+    def test_logcontexts_with_async_result(self):
+        @defer.inlineCallbacks
+        def cb():
+            yield async.sleep(0)
+            defer.returnValue("yay")
+
+        @defer.inlineCallbacks
+        def test():
+            with LoggingContext("c") as c1:
+                res = yield self.cache.fetch_or_execute(self.mock_key, cb)
+                self.assertIs(LoggingContext.current_context(), c1)
+                self.assertEqual(res, "yay")
+
+        # run the test twice in parallel
+        d = defer.gatherResults([test(), test()])
+        self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
+        yield d
+        self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
+
+    @defer.inlineCallbacks
+    def test_does_not_cache_exceptions(self):
+        """Checks that, if the callback throws an exception, it is called again
+        for the next request.
+        """
+        called = [False]
+
+        def cb():
+            if called[0]:
+                # return a valid result the second time
+                return defer.succeed(self.mock_http_response)
+
+            called[0] = True
+            raise Exception("boo")
+
+        with LoggingContext("test") as test_context:
+            try:
+                yield self.cache.fetch_or_execute(self.mock_key, cb)
+            except Exception as e:
+                self.assertEqual(e.message, "boo")
+            self.assertIs(LoggingContext.current_context(), test_context)
+
+            res = yield self.cache.fetch_or_execute(self.mock_key, cb)
+            self.assertEqual(res, self.mock_http_response)
+            self.assertIs(LoggingContext.current_context(), test_context)
+
+    @defer.inlineCallbacks
+    def test_does_not_cache_failures(self):
+        """Checks that, if the callback returns a failure, it is called again
+        for the next request.
+        """
+        called = [False]
+
+        def cb():
+            if called[0]:
+                # return a valid result the second time
+                return defer.succeed(self.mock_http_response)
+
+            called[0] = True
+            return defer.fail(Exception("boo"))
+
+        with LoggingContext("test") as test_context:
+            try:
+                yield self.cache.fetch_or_execute(self.mock_key, cb)
+            except Exception as e:
+                self.assertEqual(e.message, "boo")
+            self.assertIs(LoggingContext.current_context(), test_context)
+
+            res = yield self.cache.fetch_or_execute(self.mock_key, cb)
+            self.assertEqual(res, self.mock_http_response)
+            self.assertIs(LoggingContext.current_context(), test_context)
+
+    @defer.inlineCallbacks
     def test_cleans_up(self):
         cb = Mock(
             return_value=defer.succeed(self.mock_http_response)
diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py
index 7c7b164ee6..cc637dda1c 100644
--- a/tests/storage/test_registration.py
+++ b/tests/storage/test_registration.py
@@ -42,9 +42,14 @@ class RegistrationStoreTestCase(unittest.TestCase):
         yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
 
         self.assertEquals(
-            # TODO(paul): Surely this field should be 'user_id', not 'name'
-            #  Additionally surely it shouldn't come in a 1-element list
-            {"name": self.user_id, "password_hash": self.pwhash, "is_guest": 0},
+            {
+                # TODO(paul): Surely this field should be 'user_id', not 'name'
+                "name": self.user_id,
+                "password_hash": self.pwhash,
+                "is_guest": 0,
+                "consent_version": None,
+                "consent_server_notice_sent": None,
+            },
             (yield self.store.get_user_by_id(self.user_id))
         )
 
diff --git a/tests/utils.py b/tests/utils.py
index c2beb5d9f7..262c4a5714 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -63,6 +63,8 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.federation_rc_concurrent = 10
         config.filter_timeline_limit = 5000
         config.user_directory_search_all_users = False
+        config.user_consent_server_notice_content = None
+        config.block_events_without_consent_error = None
 
         # disable user directory updates, because they get done in the
         # background, which upsets the test runner.