summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.dockerignore5
-rw-r--r--.gitignore1
-rw-r--r--AUTHORS.rst3
-rw-r--r--Dockerfile19
-rw-r--r--MANIFEST.in2
-rw-r--r--contrib/docker/README.md148
-rw-r--r--contrib/docker/conf/homeserver.yaml219
-rw-r--r--contrib/docker/conf/log.config29
-rw-r--r--contrib/docker/docker-compose.yml49
-rwxr-xr-xcontrib/docker/start.py66
-rwxr-xr-xscripts-dev/nuke-room-from-db.sh14
-rw-r--r--synapse/app/appservice.py1
-rw-r--r--synapse/app/client_reader.py1
-rw-r--r--synapse/app/event_creator.py1
-rw-r--r--synapse/app/federation_reader.py1
-rw-r--r--synapse/app/federation_sender.py1
-rw-r--r--synapse/app/frontend_proxy.py1
-rwxr-xr-xsynapse/app/homeserver.py2
-rw-r--r--synapse/app/media_repository.py1
-rw-r--r--synapse/app/pusher.py1
-rw-r--r--synapse/app/synchrotron.py1
-rw-r--r--synapse/app/user_dir.py1
-rw-r--r--synapse/federation/units.py2
-rw-r--r--synapse/handlers/initial_sync.py12
-rw-r--r--synapse/handlers/sync.py26
-rw-r--r--synapse/http/additional_resource.py7
-rw-r--r--synapse/http/request_metrics.py149
-rw-r--r--synapse/http/server.py298
-rw-r--r--synapse/http/site.py89
-rw-r--r--synapse/rest/client/v1/pusher.py1
-rw-r--r--synapse/rest/client/v2_alpha/auth.py2
-rw-r--r--synapse/rest/key/v1/server_key_resource.py2
-rw-r--r--synapse/rest/key/v2/local_key_resource.py2
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py10
-rw-r--r--synapse/rest/media/v1/download_resource.py19
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py6
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py24
-rw-r--r--synapse/rest/media/v1/upload_resource.py16
-rw-r--r--synapse/storage/stream.py449
39 files changed, 1157 insertions, 524 deletions
diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000000..f36f86fbb7
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,5 @@
+Dockerfile
+.travis.yml
+.gitignore
+demo/etc
+tox.ini
diff --git a/.gitignore b/.gitignore
index c8901eb206..6b45e62157 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,6 +32,7 @@ demo/media_store.*
 demo/etc
 
 uploads
+cache
 
 .idea/
 media_store/
diff --git a/AUTHORS.rst b/AUTHORS.rst
index 3dcb1c2a89..e13ac5ad34 100644
--- a/AUTHORS.rst
+++ b/AUTHORS.rst
@@ -60,3 +60,6 @@ Niklas Riekenbrauck <nikriek at gmail dot.com>
 
 Christoph Witzany <christoph at web.crofting.com>
  * Add LDAP support for authentication
+
+Pierre Jaury <pierre at jaury.eu>
+* Docker packaging
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000000..8085f3d354
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,19 @@
+FROM docker.io/python:2-alpine3.7
+
+RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev
+
+COPY . /synapse
+
+# A wheel cache may be provided in ./cache for faster build
+RUN cd /synapse \
+ && pip install --upgrade pip setuptools psycopg2 \
+ && mkdir -p /synapse/cache \
+ && pip install -f /synapse/cache --upgrade --process-dependency-links . \
+ && mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \
+ && rm -rf setup.py setup.cfg synapse
+
+VOLUME ["/data"]
+
+EXPOSE 8008/tcp 8448/tcp
+
+ENTRYPOINT ["/start.py"]
diff --git a/MANIFEST.in b/MANIFEST.in
index afb60e12ee..e2a6623a63 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -25,6 +25,8 @@ recursive-include synapse/static *.js
 exclude jenkins.sh
 exclude jenkins*.sh
 exclude jenkins*
+exclude Dockerfile
+exclude .dockerignore
 recursive-exclude jenkins *.sh
 
 prune .github
diff --git a/contrib/docker/README.md b/contrib/docker/README.md
new file mode 100644
index 0000000000..aed56646c2
--- /dev/null
+++ b/contrib/docker/README.md
@@ -0,0 +1,148 @@
+# Synapse Docker
+
+This Docker image will run Synapse as a single process. It does not provide any
+database server or TURN server that you should run separately.
+
+If you run a Postgres server, you should simply have it in the same Compose
+project or set the proper environment variables and the image will automatically
+use that server.
+
+## Build
+
+Build the docker image with the `docker build` command from the root of the synapse repository.
+
+```
+docker build -t docker.io/matrixdotorg/synapse .
+```
+
+The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
+
+You may have a local Python wheel cache available, in which case copy the relevant packages in the ``cache/`` directory at the root of the project.
+
+## Run
+
+This image is designed to run either with an automatically generated configuration
+file or with a custom configuration that requires manual edition.
+
+### Automated configuration
+
+It is recommended that you use Docker Compose to run your containers, including
+this image and a Postgres server. A sample ``docker-compose.yml`` is provided,
+including example labels for reverse proxying and other artifacts.
+
+Read the section about environment variables and set at least mandatory variables,
+then run the server:
+
+```
+docker-compose up -d
+```
+
+### Manual configuration
+
+A sample ``docker-compose.yml`` is provided, including example labels for
+reverse proxying and other artifacts.
+
+Specify a ``SYNAPSE_CONFIG_PATH``, preferably to a persistent path,
+to use manual configuration. To generate a fresh ``homeserver.yaml``, simply run:
+
+```
+docker-compose run --rm -e SYNAPSE_SERVER_NAME=my.matrix.host synapse generate
+```
+
+Then, customize your configuration and run the server:
+
+```
+docker-compose up -d
+```
+
+### Without Compose
+
+If you do not wish to use Compose, you may still run this image using plain
+Docker commands. Note that the following is just a guideline and you may need
+to add parameters to the docker run command to account for the network situation
+with your postgres database.
+
+```
+docker run \
+    -d \
+    --name synapse \
+    -v ${DATA_PATH}:/data \
+    -e SYNAPSE_SERVER_NAME=my.matrix.host \
+    -e SYNAPSE_REPORT_STATS=yes \
+    docker.io/matrixdotorg/synapse:latest
+```
+
+## Volumes
+
+The image expects a single volume, located at ``/data``, that will hold:
+
+* temporary files during uploads;
+* uploaded media and thumbnails;
+* the SQLite database if you do not configure postgres;
+* the appservices configuration.
+
+You are free to use separate volumes depending on storage endpoints at your
+disposal. For instance, ``/data/media`` coud be stored on a large but low
+performance hdd storage while other files could be stored on high performance
+endpoints.
+
+In order to setup an application service, simply create an ``appservices``
+directory in the data volume and write the application service Yaml
+configuration file there. Multiple application services are supported.
+
+## Environment
+
+Unless you specify a custom path for the configuration file, a very generic
+file will be generated, based on the following environment settings.
+These are a good starting point for setting up your own deployment.
+
+Global settings:
+
+* ``UID``, the user id Synapse will run as [default 991]
+* ``GID``, the group id Synapse will run as [default 991]
+* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
+
+If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
+then customize it manually. No other environment variable is required.
+
+Otherwise, a dynamic configuration file will be used. The following environment
+variables are available for configuration:
+
+* ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
+* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
+  statistics reporting back to the Matrix project which helps us to get funding.
+* ``SYNAPSE_MACAROON_SECRET_KEY`` (mandatory) secret for signing access tokens
+  to the server, set this to a proper random key.
+* ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
+  you run your own TLS-capable reverse proxy).
+* ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
+  the Synapse instance.
+* ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
+* ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
+* ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
+* ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
+  key in order to enable recaptcha upon registration.
+* ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
+  key in order to enable recaptcha upon registration.
+* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
+  uris to enable TURN for this homeserver.
+* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
+
+Shared secrets, that will be initialized to random values if not set:
+
+* ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
+  registration is disable.
+
+Database specific values (will use SQLite if not set):
+
+* `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
+* `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
+* `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
+* `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
+
+Mail server specific values (will not send emails if not set):
+
+* ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
+* ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
+* ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
+* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.
diff --git a/contrib/docker/conf/homeserver.yaml b/contrib/docker/conf/homeserver.yaml
new file mode 100644
index 0000000000..6bc25bb45f
--- /dev/null
+++ b/contrib/docker/conf/homeserver.yaml
@@ -0,0 +1,219 @@
+# vim:ft=yaml
+
+## TLS ##
+
+tls_certificate_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.crt"
+tls_private_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.key"
+tls_dh_params_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.dh"
+no_tls: {{ "True" if SYNAPSE_NO_TLS else "False" }}
+tls_fingerprints: []
+
+## Server ##
+
+server_name: "{{ SYNAPSE_SERVER_NAME }}"
+pid_file: /homeserver.pid
+web_client: False
+soft_file_limit: 0
+
+## Ports ##
+
+listeners:
+  {% if not SYNAPSE_NO_TLS %}
+  -
+    port: 8448
+    bind_addresses: ['0.0.0.0']
+    type: http
+    tls: true
+    x_forwarded: false
+    resources:
+      - names: [client]
+        compress: true
+      - names: [federation]  # Federation APIs
+        compress: false
+  {% endif %}
+
+  - port: 8008
+    tls: false
+    bind_addresses: ['0.0.0.0']
+    type: http
+    x_forwarded: false
+
+    resources:
+      - names: [client]
+        compress: true
+      - names: [federation]
+        compress: false
+
+## Database ##
+
+{% if POSTGRES_PASSWORD %}
+database:
+  name: "psycopg2"
+  args:
+    user: "{{ POSTGRES_USER or "synapse" }}"
+    password: "{{ POSTGRES_PASSWORD }}"
+    database: "{{ POSTGRES_DB or "synapse" }}"
+    host: "{{ POSTGRES_HOST or "db" }}"
+    port: "{{ POSTGRES_PORT or "5432" }}"
+    cp_min: 5
+    cp_max: 10
+{% else %}
+database:
+  name: "sqlite3"
+  args:
+    database: "/data/homeserver.db"
+{% endif %}
+
+## Performance ##
+
+event_cache_size: "{{ SYNAPSE_EVENT_CACHE_SIZE or "10K" }}"
+verbose: 0
+log_file: "/data/homeserver.log"
+log_config: "/compiled/log.config"
+
+## Ratelimiting ##
+
+rc_messages_per_second: 0.2
+rc_message_burst_count: 10.0
+federation_rc_window_size: 1000
+federation_rc_sleep_limit: 10
+federation_rc_sleep_delay: 500
+federation_rc_reject_limit: 50
+federation_rc_concurrent: 3
+
+## Files ##
+
+media_store_path: "/data/media"
+uploads_path: "/data/uploads"
+max_upload_size: "10M"
+max_image_pixels: "32M"
+dynamic_thumbnails: false
+
+# List of thumbnail to precalculate when an image is uploaded.
+thumbnail_sizes:
+- width: 32
+  height: 32
+  method: crop
+- width: 96
+  height: 96
+  method: crop
+- width: 320
+  height: 240
+  method: scale
+- width: 640
+  height: 480
+  method: scale
+- width: 800
+  height: 600
+  method: scale
+
+url_preview_enabled: False
+max_spider_size: "10M"
+
+## Captcha ##
+
+{% if SYNAPSE_RECAPTCHA_PUBLIC_KEY %}
+recaptcha_public_key: "{{ SYNAPSE_RECAPTCHA_PUBLIC_KEY }}"
+recaptcha_private_key: "{{ SYNAPSE_RECAPTCHA_PRIVATE_KEY }}"
+enable_registration_captcha: True
+recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
+{% else %}
+recaptcha_public_key: "YOUR_PUBLIC_KEY"
+recaptcha_private_key: "YOUR_PRIVATE_KEY"
+enable_registration_captcha: False
+recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
+{% endif %}
+
+## Turn ##
+
+{% if SYNAPSE_TURN_URIS %}
+turn_uris:
+{% for uri in SYNAPSE_TURN_URIS.split(',') %}    - "{{ uri }}"
+{% endfor %}
+turn_shared_secret: "{{ SYNAPSE_TURN_SECRET }}"
+turn_user_lifetime: "1h"
+turn_allow_guests: True
+{% else %}
+turn_uris: []
+turn_shared_secret: "YOUR_SHARED_SECRET"
+turn_user_lifetime: "1h"
+turn_allow_guests: True
+{% endif %}
+
+## Registration ##
+
+enable_registration: {{ "True" if SYNAPSE_ENABLE_REGISTRATION else "False" }}
+registration_shared_secret: "{{ SYNAPSE_REGISTRATION_SHARED_SECRET }}"
+bcrypt_rounds: 12
+allow_guest_access: {{ "True" if SYNAPSE_ALLOW_GUEST else "False" }}
+enable_group_creation: true
+
+# The list of identity servers trusted to verify third party
+# identifiers by this server.
+trusted_third_party_id_servers:
+    - matrix.org
+    - vector.im
+    - riot.im
+
+## Metrics ###
+
+{% if SYNAPSE_REPORT_STATS.lower() == "yes" %}
+enable_metrics: True
+report_stats: True
+{% else %}
+enable_metrics: False
+report_stats: False
+{% endif %}
+
+## API Configuration ##
+
+room_invite_state_types:
+    - "m.room.join_rules"
+    - "m.room.canonical_alias"
+    - "m.room.avatar"
+    - "m.room.name"
+
+{% if SYNAPSE_APPSERVICES %}
+app_service_config_files:
+{% for appservice in SYNAPSE_APPSERVICES %}    - "{{ appservice }}"
+{% endfor %}
+{% else %}
+app_service_config_files: []
+{% endif %}
+
+macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
+expire_access_token: False
+
+## Signing Keys ##
+
+signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
+old_signing_keys: {}
+key_refresh_interval: "1d" # 1 Day.
+
+# The trusted servers to download signing keys from.
+perspectives:
+  servers:
+    "matrix.org":
+      verify_keys:
+        "ed25519:auto":
+          key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+
+password_config:
+   enabled: true
+
+{% if SYNAPSE_SMTP_HOST %}
+email:
+   enable_notifs: false
+   smtp_host: "{{ SYNAPSE_SMTP_HOST }}"
+   smtp_port: {{ SYNAPSE_SMTP_PORT or "25" }}
+   smtp_user: "{{ SYNAPSE_SMTP_USER }}"
+   smtp_pass: "{{ SYNAPSE_SMTP_PASSWORD }}"
+   require_transport_security: False
+   notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}"
+   app_name: Matrix
+   template_dir: res/templates
+   notif_template_html: notif_mail.html
+   notif_template_text: notif_mail.txt
+   notif_for_new_users: True
+   riot_base_url: "https://{{ SYNAPSE_SERVER_NAME }}"
+{% endif %}
diff --git a/contrib/docker/conf/log.config b/contrib/docker/conf/log.config
new file mode 100644
index 0000000000..1851995802
--- /dev/null
+++ b/contrib/docker/conf/log.config
@@ -0,0 +1,29 @@
+version: 1
+
+formatters:
+  precise:
+   format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
+
+filters:
+  context:
+    (): synapse.util.logcontext.LoggingContextFilter
+    request: ""
+
+handlers:
+  console:
+    class: logging.StreamHandler
+    formatter: precise
+    filters: [context]
+
+loggers:
+    synapse:
+        level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
+
+    synapse.storage.SQL:
+        # beware: increasing this to DEBUG will make synapse log sensitive
+        # information such as access tokens.
+        level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
+
+root:
+    level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
+    handlers: [console]
diff --git a/contrib/docker/docker-compose.yml b/contrib/docker/docker-compose.yml
new file mode 100644
index 0000000000..0b531949e0
--- /dev/null
+++ b/contrib/docker/docker-compose.yml
@@ -0,0 +1,49 @@
+# This compose file is compatible with Compose itself, it might need some
+# adjustments to run properly with stack.
+
+version: '3'
+
+services:
+
+  synapse:
+    image: docker.io/matrixdotorg/synapse:latest
+    # Since snyapse does not retry to connect to the database, restart upon
+    # failure
+    restart: unless-stopped
+    # See the readme for a full documentation of the environment settings
+    environment:
+      - SYNAPSE_SERVER_NAME=my.matrix.host
+      - SYNAPSE_REPORT_STATS=no
+      - SYNAPSE_ENABLE_REGISTRATION=yes
+      - SYNAPSE_LOG_LEVEL=INFO
+      - POSTGRES_PASSWORD=changeme
+    volumes:
+      # You may either store all the files in a local folder
+      - ./files:/data
+      # .. or you may split this between different storage points
+      # - ./files:/data
+      # - /path/to/ssd:/data/uploads
+      # - /path/to/large_hdd:/data/media
+    depends_on:
+      - db
+    # In order to expose Synapse, remove one of the following, you might for
+    # instance expose the TLS port directly:
+    ports:
+      - 8448:8448/tcp
+    # ... or use a reverse proxy, here is an example for traefik:
+    labels:
+      - traefik.enable=true
+      - traefik.frontend.rule=Host:my.matrix.Host
+      - traefik.port=8448
+
+  db:
+    image: docker.io/postgres:10-alpine
+    # Change that password, of course!
+    environment:
+      - POSTGRES_USER=synapse
+      - POSTGRES_PASSWORD=changeme
+    volumes:
+      # You may store the database tables in a local folder..
+      - ./schemas:/var/lib/postgresql/data
+      # .. or store them on some high performance storage for better results
+      # - /path/to/ssd/storage:/var/lib/postfesql/data
diff --git a/contrib/docker/start.py b/contrib/docker/start.py
new file mode 100755
index 0000000000..90e8b9c51a
--- /dev/null
+++ b/contrib/docker/start.py
@@ -0,0 +1,66 @@
+#!/usr/local/bin/python
+
+import jinja2
+import os
+import sys
+import subprocess
+import glob
+
+# Utility functions
+convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ))
+
+def check_arguments(environ, args):
+    for argument in args:
+        if argument not in environ:
+            print("Environment variable %s is mandatory, exiting." % argument)
+            sys.exit(2)
+
+def generate_secrets(environ, secrets):
+    for name, secret in secrets.items():
+        if secret not in environ:
+            filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
+            if os.path.exists(filename):
+                with open(filename) as handle: value = handle.read()
+            else:
+                print("Generating a random secret for {}".format(name))
+                value = os.urandom(32).encode("hex")
+                with open(filename, "w") as handle: handle.write(value)
+            environ[secret] = value
+
+# Prepare the configuration
+mode = sys.argv[1] if len(sys.argv) > 1 else None
+environ = os.environ.copy()
+ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
+args = ["python", "-m", "synapse.app.homeserver"]
+
+# In generate mode, generate a configuration, missing keys, then exit
+if mode == "generate":
+    check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS", "SYNAPSE_CONFIG_PATH"))
+    args += [
+        "--server-name", environ["SYNAPSE_SERVER_NAME"],
+        "--report-stats", environ["SYNAPSE_REPORT_STATS"],
+        "--config-path", environ["SYNAPSE_CONFIG_PATH"],
+        "--generate-config"
+    ]
+    os.execv("/usr/local/bin/python", args)
+
+# In normal mode, generate missing keys if any, then run synapse
+else:
+    # Parse the configuration file
+    if "SYNAPSE_CONFIG_PATH" in environ:
+        args += ["--config-path", environ["SYNAPSE_CONFIG_PATH"]]
+    else:
+        check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"))
+        generate_secrets(environ, {
+            "registration": "SYNAPSE_REGISTRATION_SHARED_SECRET",
+            "macaroon": "SYNAPSE_MACAROON_SECRET_KEY"
+        })
+        environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
+        if not os.path.exists("/compiled"): os.mkdir("/compiled")
+        convert("/conf/homeserver.yaml", "/compiled/homeserver.yaml", environ)
+        convert("/conf/log.config", "/compiled/log.config", environ)
+        subprocess.check_output(["chown", "-R", ownership, "/data"])
+        args += ["--config-path", "/compiled/homeserver.yaml"]
+    # Generate missing keys and start synapse
+    subprocess.check_output(args + ["--generate-keys"])
+    os.execv("/sbin/su-exec", ["su-exec", ownership] + args)
diff --git a/scripts-dev/nuke-room-from-db.sh b/scripts-dev/nuke-room-from-db.sh
index 1201d176c2..c62928afdb 100755
--- a/scripts-dev/nuke-room-from-db.sh
+++ b/scripts-dev/nuke-room-from-db.sh
@@ -6,9 +6,19 @@
 
 ## Do not run it lightly.
 
+set -e
+
+if [ "$1" == "-h" ] || [ "$1" == "" ]; then
+  echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
+  echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
+  echo "or"
+  echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
+  exit
+fi
+
 ROOMID="$1"
 
-sqlite3 homeserver.db <<EOF
+cat <<EOF
 DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
 DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
 DELETE FROM event_edges WHERE room_id = '$ROOMID';
@@ -29,7 +39,7 @@ DELETE FROM state_groups WHERE room_id = '$ROOMID';
 DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
 DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
 DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
-DELETE FROM event_search_content WHERE c1room_id = '$ROOMID';
+DELETE FROM event_search WHERE room_id = '$ROOMID'; 
 DELETE FROM guest_access WHERE room_id = '$ROOMID';
 DELETE FROM history_visibility WHERE room_id = '$ROOMID';
 DELETE FROM room_tags WHERE room_id = '$ROOMID';
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 58f2c9d68c..b1efacc9f8 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -74,6 +74,7 @@ class AppserviceServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 267d34c881..38b98382c6 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -98,6 +98,7 @@ class ClientReaderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index b915d12d53..bd7f3d5679 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -114,6 +114,7 @@ class EventCreatorServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index c1dc66dd17..6e10b27b9e 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -87,6 +87,7 @@ class FederationReaderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a08af83a4c..6f24e32d6d 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -101,6 +101,7 @@ class FederationSenderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index b349e3e3ce..0f700ee786 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -152,6 +152,7 @@ class FrontendProxyServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index a0e465d644..75f40fd5a4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -140,6 +140,7 @@ class SynapseHomeServer(HomeServer):
                     site_tag,
                     listener_config,
                     root_resource,
+                    self.version_string,
                 ),
                 self.tls_server_context_factory,
             )
@@ -153,6 +154,7 @@ class SynapseHomeServer(HomeServer):
                     site_tag,
                     listener_config,
                     root_resource,
+                    self.version_string,
                 )
             )
         logger.info("Synapse now listening on port %d", port)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index fc8282bbc1..9c93195f0a 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -94,6 +94,7 @@ class MediaRepositoryServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 26930d1b3b..3912eae48c 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -104,6 +104,7 @@ class PusherServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 7152b1deb4..c6294a7a0c 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -281,6 +281,7 @@ class SynchrotronServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 5ba7e9b416..53eb3474da 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -126,6 +126,7 @@ class UserDirectoryServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 3f645acc43..01c5b8fe17 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -74,8 +74,6 @@ class Transaction(JsonEncodedObject):
         "previous_ids",
         "pdus",
         "edus",
-        "transaction_id",
-        "destination",
         "pdu_failures",
     ]
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index cd33a86599..71af86fe21 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -181,8 +181,8 @@ class InitialSyncHandler(BaseHandler):
                     self.store, user_id, messages
                 )
 
-                start_token = now_token.copy_and_replace("room_key", token[0])
-                end_token = now_token.copy_and_replace("room_key", token[1])
+                start_token = now_token.copy_and_replace("room_key", token)
+                end_token = now_token.copy_and_replace("room_key", room_end_token)
                 time_now = self.clock.time_msec()
 
                 d["messages"] = {
@@ -325,8 +325,8 @@ class InitialSyncHandler(BaseHandler):
             self.store, user_id, messages, is_peeking=is_peeking
         )
 
-        start_token = StreamToken.START.copy_and_replace("room_key", token[0])
-        end_token = StreamToken.START.copy_and_replace("room_key", token[1])
+        start_token = StreamToken.START.copy_and_replace("room_key", token)
+        end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
 
         time_now = self.clock.time_msec()
 
@@ -408,8 +408,8 @@ class InitialSyncHandler(BaseHandler):
             self.store, user_id, messages, is_peeking=is_peeking,
         )
 
-        start_token = now_token.copy_and_replace("room_key", token[0])
-        end_token = now_token.copy_and_replace("room_key", token[1])
+        start_token = now_token.copy_and_replace("room_key", token)
+        end_token = now_token
 
         time_now = self.clock.time_msec()
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b52e4c2aff..263e42dded 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -354,12 +354,24 @@ class SyncHandler(object):
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
-                events, end_key = yield self.store.get_room_events_stream_for_room(
-                    room_id,
-                    limit=load_limit + 1,
-                    from_key=since_key,
-                    to_key=end_key,
-                )
+                # If we have a since_key then we are trying to get any events
+                # that have happened since `since_key` up to `end_key`, so we
+                # can just use `get_room_events_stream_for_room`.
+                # Otherwise, we want to return the last N events in the room
+                # in toplogical ordering.
+                if since_key:
+                    events, end_key = yield self.store.get_room_events_stream_for_room(
+                        room_id,
+                        limit=load_limit + 1,
+                        from_key=since_key,
+                        to_key=end_key,
+                    )
+                else:
+                    events, end_key = yield self.store.get_recent_events_for_room(
+                        room_id,
+                        limit=load_limit + 1,
+                        end_token=end_key,
+                    )
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
                 )
@@ -429,7 +441,7 @@ class SyncHandler(object):
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        last_events, token = yield self.store.get_recent_events_for_room(
+        last_events, _ = yield self.store.get_recent_events_for_room(
             room_id, end_token=stream_position.room_key, limit=1,
         )
 
diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py
index 343e932cb1..a797396ade 100644
--- a/synapse/http/additional_resource.py
+++ b/synapse/http/additional_resource.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.http.server import wrap_request_handler
+from synapse.http.server import wrap_json_request_handler
 from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
 
@@ -42,14 +42,13 @@ class AdditionalResource(Resource):
         Resource.__init__(self)
         self._handler = handler
 
-        # these are required by the request_handler wrapper
-        self.version_string = hs.version_string
+        # required by the request_handler wrapper
         self.clock = hs.get_clock()
 
     def render(self, request):
         self._async_render(request)
         return NOT_DONE_YET
 
-    @wrap_request_handler
+    @wrap_json_request_handler
     def _async_render(self, request):
         return self._handler(request)
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
new file mode 100644
index 0000000000..8c850bf23f
--- /dev/null
+++ b/synapse/http/request_metrics.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import synapse.metrics
+from synapse.util.logcontext import LoggingContext
+
+logger = logging.getLogger(__name__)
+
+metrics = synapse.metrics.get_metrics_for("synapse.http.server")
+
+# total number of responses served, split by method/servlet/tag
+response_count = metrics.register_counter(
+    "response_count",
+    labels=["method", "servlet", "tag"],
+    alternative_names=(
+        # the following are all deprecated aliases for the same metric
+        metrics.name_prefix + x for x in (
+            "_requests",
+            "_response_time:count",
+            "_response_ru_utime:count",
+            "_response_ru_stime:count",
+            "_response_db_txn_count:count",
+            "_response_db_txn_duration:count",
+        )
+    )
+)
+
+requests_counter = metrics.register_counter(
+    "requests_received",
+    labels=["method", "servlet", ],
+)
+
+outgoing_responses_counter = metrics.register_counter(
+    "responses",
+    labels=["method", "code"],
+)
+
+response_timer = metrics.register_counter(
+    "response_time_seconds",
+    labels=["method", "servlet", "tag"],
+    alternative_names=(
+        metrics.name_prefix + "_response_time:total",
+    ),
+)
+
+response_ru_utime = metrics.register_counter(
+    "response_ru_utime_seconds", labels=["method", "servlet", "tag"],
+    alternative_names=(
+        metrics.name_prefix + "_response_ru_utime:total",
+    ),
+)
+
+response_ru_stime = metrics.register_counter(
+    "response_ru_stime_seconds", labels=["method", "servlet", "tag"],
+    alternative_names=(
+        metrics.name_prefix + "_response_ru_stime:total",
+    ),
+)
+
+response_db_txn_count = metrics.register_counter(
+    "response_db_txn_count", labels=["method", "servlet", "tag"],
+    alternative_names=(
+        metrics.name_prefix + "_response_db_txn_count:total",
+    ),
+)
+
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
+response_db_txn_duration = metrics.register_counter(
+    "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
+    alternative_names=(
+        metrics.name_prefix + "_response_db_txn_duration:total",
+    ),
+)
+
+# seconds spent waiting for a db connection, when processing this request
+response_db_sched_duration = metrics.register_counter(
+    "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
+)
+
+# size in bytes of the response written
+response_size = metrics.register_counter(
+    "response_size", labels=["method", "servlet", "tag"]
+)
+
+
+class RequestMetrics(object):
+    def start(self, time_msec, name):
+        self.start = time_msec
+        self.start_context = LoggingContext.current_context()
+        self.name = name
+
+    def stop(self, time_msec, request):
+        context = LoggingContext.current_context()
+
+        tag = ""
+        if context:
+            tag = context.tag
+
+            if context != self.start_context:
+                logger.warn(
+                    "Context have unexpectedly changed %r, %r",
+                    context, self.start_context
+                )
+                return
+
+        outgoing_responses_counter.inc(request.method, str(request.code))
+
+        response_count.inc(request.method, self.name, tag)
+
+        response_timer.inc_by(
+            time_msec - self.start, request.method,
+            self.name, tag
+        )
+
+        ru_utime, ru_stime = context.get_resource_usage()
+
+        response_ru_utime.inc_by(
+            ru_utime, request.method, self.name, tag
+        )
+        response_ru_stime.inc_by(
+            ru_stime, request.method, self.name, tag
+        )
+        response_db_txn_count.inc_by(
+            context.db_txn_count, request.method, self.name, tag
+        )
+        response_db_txn_duration.inc_by(
+            context.db_txn_duration_ms / 1000., request.method, self.name, tag
+        )
+        response_db_sched_duration.inc_by(
+            context.db_sched_duration_ms / 1000., request.method, self.name, tag
+        )
+
+        response_size.inc_by(request.sentLength, request.method, self.name, tag)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 55b9ad5251..b6e2ae14a2 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,9 @@
 from synapse.api.errors import (
     cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
 )
+from synapse.http.request_metrics import (
+    requests_counter,
+)
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.caches import intern_dict
 from synapse.util.metrics import Measure
@@ -41,178 +44,103 @@ import simplejson
 
 logger = logging.getLogger(__name__)
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-# total number of responses served, split by method/servlet/tag
-response_count = metrics.register_counter(
-    "response_count",
-    labels=["method", "servlet", "tag"],
-    alternative_names=(
-        # the following are all deprecated aliases for the same metric
-        metrics.name_prefix + x for x in (
-            "_requests",
-            "_response_time:count",
-            "_response_ru_utime:count",
-            "_response_ru_stime:count",
-            "_response_db_txn_count:count",
-            "_response_db_txn_duration:count",
-        )
-    )
-)
-
-requests_counter = metrics.register_counter(
-    "requests_received",
-    labels=["method", "servlet", ],
-)
-
-outgoing_responses_counter = metrics.register_counter(
-    "responses",
-    labels=["method", "code"],
-)
-
-response_timer = metrics.register_counter(
-    "response_time_seconds",
-    labels=["method", "servlet", "tag"],
-    alternative_names=(
-        metrics.name_prefix + "_response_time:total",
-    ),
-)
-
-response_ru_utime = metrics.register_counter(
-    "response_ru_utime_seconds", labels=["method", "servlet", "tag"],
-    alternative_names=(
-        metrics.name_prefix + "_response_ru_utime:total",
-    ),
-)
-
-response_ru_stime = metrics.register_counter(
-    "response_ru_stime_seconds", labels=["method", "servlet", "tag"],
-    alternative_names=(
-        metrics.name_prefix + "_response_ru_stime:total",
-    ),
-)
-
-response_db_txn_count = metrics.register_counter(
-    "response_db_txn_count", labels=["method", "servlet", "tag"],
-    alternative_names=(
-        metrics.name_prefix + "_response_db_txn_count:total",
-    ),
-)
 
-# seconds spent waiting for db txns, excluding scheduling time, when processing
-# this request
-response_db_txn_duration = metrics.register_counter(
-    "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
-    alternative_names=(
-        metrics.name_prefix + "_response_db_txn_duration:total",
-    ),
-)
+def wrap_json_request_handler(h):
+    """Wraps a request handler method with exception handling.
 
-# seconds spent waiting for a db connection, when processing this request
-response_db_sched_duration = metrics.register_counter(
-    "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
-)
+    Also adds logging as per wrap_request_handler_with_logging.
 
-# size in bytes of the response written
-response_size = metrics.register_counter(
-    "response_size", labels=["method", "servlet", "tag"]
-)
+    The handler method must have a signature of "handle_foo(self, request)",
+    where "self" must have a "clock" attribute (and "request" must be a
+    SynapseRequest).
 
-_next_request_id = 0
+    The handler must return a deferred. If the deferred succeeds we assume that
+    a response has been sent. If the deferred fails with a SynapseError we use
+    it to send a JSON response with the appropriate HTTP reponse code. If the
+    deferred fails with any other type of error we send a 500 reponse.
+    """
 
+    @defer.inlineCallbacks
+    def wrapped_request_handler(self, request):
+        try:
+            yield h(self, request)
+        except CodeMessageException as e:
+            code = e.code
+            if isinstance(e, SynapseError):
+                logger.info(
+                    "%s SynapseError: %s - %s", request, code, e.msg
+                )
+            else:
+                logger.exception(e)
+            respond_with_json(
+                request, code, cs_exception(e), send_cors=True,
+                pretty_print=_request_user_agent_is_curl(request),
+            )
 
-def request_handler(include_metrics=False):
-    """Decorator for ``wrap_request_handler``"""
-    return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
+        except Exception:
+            # failure.Failure() fishes the original Failure out
+            # of our stack, and thus gives us a sensible stack
+            # trace.
+            f = failure.Failure()
+            logger.error(
+                "Failed handle request via %r: %r: %s",
+                h,
+                request,
+                f.getTraceback().rstrip(),
+            )
+            respond_with_json(
+                request,
+                500,
+                {
+                    "error": "Internal server error",
+                    "errcode": Codes.UNKNOWN,
+                },
+                send_cors=True,
+                pretty_print=_request_user_agent_is_curl(request),
+            )
 
+    return wrap_request_handler_with_logging(wrapped_request_handler)
 
-def wrap_request_handler(request_handler, include_metrics=False):
-    """Wraps a method that acts as a request handler with the necessary logging
-    and exception handling.
 
-    The method must have a signature of "handle_foo(self, request)". The
-    argument "self" must have "version_string" and "clock" attributes. The
-    argument "request" must be a twisted HTTP request.
+def wrap_request_handler_with_logging(h):
+    """Wraps a request handler to provide logging and metrics
 
-    The method must return a deferred. If the deferred succeeds we assume that
-    a response has been sent. If the deferred fails with a SynapseError we use
-    it to send a JSON response with the appropriate HTTP reponse code. If the
-    deferred fails with any other type of error we send a 500 reponse.
+    The handler method must have a signature of "handle_foo(self, request)",
+    where "self" must have a "clock" attribute (and "request" must be a
+    SynapseRequest).
 
-    We insert a unique request-id into the logging context for this request and
-    log the response and duration for this request.
+    As well as calling `request.processing` (which will log the response and
+    duration for this request), the wrapped request handler will insert the
+    request id into the logging context.
     """
-
     @defer.inlineCallbacks
     def wrapped_request_handler(self, request):
-        global _next_request_id
-        request_id = "%s-%s" % (request.method, _next_request_id)
-        _next_request_id += 1
+        """
+        Args:
+            self:
+            request (synapse.http.site.SynapseRequest):
+        """
 
+        request_id = request.get_request_id()
         with LoggingContext(request_id) as request_context:
+            request_context.request = request_id
             with Measure(self.clock, "wrapped_request_handler"):
-                request_metrics = RequestMetrics()
                 # we start the request metrics timer here with an initial stab
                 # at the servlet name. For most requests that name will be
                 # JsonResource (or a subclass), and JsonResource._async_render
                 # will update it once it picks a servlet.
                 servlet_name = self.__class__.__name__
-                request_metrics.start(self.clock, name=servlet_name)
-
-                request_context.request = request_id
-                with request.processing():
-                    try:
-                        with PreserveLoggingContext(request_context):
-                            if include_metrics:
-                                yield request_handler(self, request, request_metrics)
-                            else:
-                                requests_counter.inc(request.method, servlet_name)
-                                yield request_handler(self, request)
-                    except CodeMessageException as e:
-                        code = e.code
-                        if isinstance(e, SynapseError):
-                            logger.info(
-                                "%s SynapseError: %s - %s", request, code, e.msg
-                            )
-                        else:
-                            logger.exception(e)
-                        outgoing_responses_counter.inc(request.method, str(code))
-                        respond_with_json(
-                            request, code, cs_exception(e), send_cors=True,
-                            pretty_print=_request_user_agent_is_curl(request),
-                            version_string=self.version_string,
-                        )
-                    except Exception:
-                        # failure.Failure() fishes the original Failure out
-                        # of our stack, and thus gives us a sensible stack
-                        # trace.
-                        f = failure.Failure()
-                        logger.error(
-                            "Failed handle request %s.%s on %r: %r: %s",
-                            request_handler.__module__,
-                            request_handler.__name__,
-                            self,
-                            request,
-                            f.getTraceback().rstrip(),
-                        )
-                        respond_with_json(
-                            request,
-                            500,
-                            {
-                                "error": "Internal server error",
-                                "errcode": Codes.UNKNOWN,
-                            },
-                            send_cors=True,
-                            pretty_print=_request_user_agent_is_curl(request),
-                            version_string=self.version_string,
-                        )
-                    finally:
-                        try:
-                            request_metrics.stop(
-                                self.clock, request
-                            )
-                        except Exception as e:
-                            logger.warn("Failed to stop metrics: %r", e)
+                with request.processing(servlet_name):
+                    with PreserveLoggingContext(request_context):
+                        d = h(self, request)
+
+                        # record the arrival of the request *after*
+                        # dispatching to the handler, so that the handler
+                        # can update the servlet name in the request
+                        # metrics
+                        requests_counter.inc(request.method,
+                                             request.request_metrics.name)
+                        yield d
     return wrapped_request_handler
 
 
@@ -262,7 +190,6 @@ class JsonResource(HttpServer, resource.Resource):
         self.canonical_json = canonical_json
         self.clock = hs.get_clock()
         self.path_regexs = {}
-        self.version_string = hs.version_string
         self.hs = hs
 
     def register_paths(self, method, path_patterns, callback):
@@ -278,13 +205,9 @@ class JsonResource(HttpServer, resource.Resource):
         self._async_render(request)
         return server.NOT_DONE_YET
 
-    # Disable metric reporting because _async_render does its own metrics.
-    # It does its own metric reporting because _async_render dispatches to
-    # a callback and it's the class name of that callback we want to report
-    # against rather than the JsonResource itself.
-    @request_handler(include_metrics=True)
+    @wrap_json_request_handler
     @defer.inlineCallbacks
-    def _async_render(self, request, request_metrics):
+    def _async_render(self, request):
         """ This gets called from render() every time someone sends us a request.
             This checks if anyone has registered a callback for that method and
             path.
@@ -296,9 +219,7 @@ class JsonResource(HttpServer, resource.Resource):
             servlet_classname = servlet_instance.__class__.__name__
         else:
             servlet_classname = "%r" % callback
-
-        request_metrics.name = servlet_classname
-        requests_counter.inc(request.method, servlet_classname)
+        request.request_metrics.name = servlet_classname
 
         # Now trigger the callback. If it returns a response, we send it
         # here. If it throws an exception, that is handled by the wrapper
@@ -345,15 +266,12 @@ class JsonResource(HttpServer, resource.Resource):
 
     def _send_response(self, request, code, response_json_object,
                        response_code_message=None):
-        outgoing_responses_counter.inc(request.method, str(code))
-
         # TODO: Only enable CORS for the requests that need it.
         respond_with_json(
             request, code, response_json_object,
             send_cors=True,
             response_code_message=response_code_message,
             pretty_print=_request_user_agent_is_curl(request),
-            version_string=self.version_string,
             canonical_json=self.canonical_json,
         )
 
@@ -386,54 +304,6 @@ def _unrecognised_request_handler(request):
     raise UnrecognizedRequestError()
 
 
-class RequestMetrics(object):
-    def start(self, clock, name):
-        self.start = clock.time_msec()
-        self.start_context = LoggingContext.current_context()
-        self.name = name
-
-    def stop(self, clock, request):
-        context = LoggingContext.current_context()
-
-        tag = ""
-        if context:
-            tag = context.tag
-
-            if context != self.start_context:
-                logger.warn(
-                    "Context have unexpectedly changed %r, %r",
-                    context, self.start_context
-                )
-                return
-
-        response_count.inc(request.method, self.name, tag)
-
-        response_timer.inc_by(
-            clock.time_msec() - self.start, request.method,
-            self.name, tag
-        )
-
-        ru_utime, ru_stime = context.get_resource_usage()
-
-        response_ru_utime.inc_by(
-            ru_utime, request.method, self.name, tag
-        )
-        response_ru_stime.inc_by(
-            ru_stime, request.method, self.name, tag
-        )
-        response_db_txn_count.inc_by(
-            context.db_txn_count, request.method, self.name, tag
-        )
-        response_db_txn_duration.inc_by(
-            context.db_txn_duration_ms / 1000., request.method, self.name, tag
-        )
-        response_db_sched_duration.inc_by(
-            context.db_sched_duration_ms / 1000., request.method, self.name, tag
-        )
-
-        response_size.inc_by(request.sentLength, request.method, self.name, tag)
-
-
 class RootRedirect(resource.Resource):
     """Redirects the root '/' path to another path."""
 
@@ -452,7 +322,7 @@ class RootRedirect(resource.Resource):
 
 def respond_with_json(request, code, json_object, send_cors=False,
                       response_code_message=None, pretty_print=False,
-                      version_string="", canonical_json=True):
+                      canonical_json=True):
     # could alternatively use request.notifyFinish() and flip a flag when
     # the Deferred fires, but since the flag is RIGHT THERE it seems like
     # a waste.
@@ -474,12 +344,11 @@ def respond_with_json(request, code, json_object, send_cors=False,
         request, code, json_bytes,
         send_cors=send_cors,
         response_code_message=response_code_message,
-        version_string=version_string
     )
 
 
 def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
-                            version_string="", response_code_message=None):
+                            response_code_message=None):
     """Sends encoded JSON in response to the given request.
 
     Args:
@@ -493,7 +362,6 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
 
     request.setResponseCode(code, message=response_code_message)
     request.setHeader(b"Content-Type", b"application/json")
-    request.setHeader(b"Server", version_string)
     request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
     request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index c8b46e1af2..202a990508 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -12,24 +12,48 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.logcontext import LoggingContext
-from twisted.web.server import Site, Request
-
 import contextlib
 import logging
 import re
 import time
 
+from twisted.web.server import Site, Request
+
+from synapse.http.request_metrics import RequestMetrics
+from synapse.util.logcontext import LoggingContext
+
+logger = logging.getLogger(__name__)
+
 ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
 
+_next_request_seq = 0
+
 
 class SynapseRequest(Request):
+    """Class which encapsulates an HTTP request to synapse.
+
+    All of the requests processed in synapse are of this type.
+
+    It extends twisted's twisted.web.server.Request, and adds:
+     * Unique request ID
+     * Redaction of access_token query-params in __repr__
+     * Logging at start and end
+     * Metrics to record CPU, wallclock and DB time by endpoint.
+
+    It provides a method `processing` which should be called by the Resource
+    which is handling the request, and returns a context manager.
+
+    """
     def __init__(self, site, *args, **kw):
         Request.__init__(self, *args, **kw)
         self.site = site
         self.authenticated_entity = None
         self.start_time = 0
 
+        global _next_request_seq
+        self.request_seq = _next_request_seq
+        _next_request_seq += 1
+
     def __repr__(self):
         # We overwrite this so that we don't log ``access_token``
         return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
@@ -41,6 +65,9 @@ class SynapseRequest(Request):
             self.site.site_tag,
         )
 
+    def get_request_id(self):
+        return "%s-%i" % (self.method, self.request_seq)
+
     def get_redacted_uri(self):
         return ACCESS_TOKEN_RE.sub(
             br'\1<redacted>\3',
@@ -50,7 +77,16 @@ class SynapseRequest(Request):
     def get_user_agent(self):
         return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
 
-    def started_processing(self):
+    def render(self, resrc):
+        # override the Server header which is set by twisted
+        self.setHeader("Server", self.site.server_version_string)
+        return Request.render(self, resrc)
+
+    def _started_processing(self, servlet_name):
+        self.start_time = int(time.time() * 1000)
+        self.request_metrics = RequestMetrics()
+        self.request_metrics.start(self.start_time, name=servlet_name)
+
         self.site.access_logger.info(
             "%s - %s - Received request: %s %s",
             self.getClientIP(),
@@ -58,10 +94,8 @@ class SynapseRequest(Request):
             self.method,
             self.get_redacted_uri()
         )
-        self.start_time = int(time.time() * 1000)
-
-    def finished_processing(self):
 
+    def _finished_processing(self):
         try:
             context = LoggingContext.current_context()
             ru_utime, ru_stime = context.get_resource_usage()
@@ -72,6 +106,8 @@ class SynapseRequest(Request):
             ru_utime, ru_stime = (0, 0)
             db_txn_count, db_txn_duration_ms = (0, 0)
 
+        end_time = int(time.time() * 1000)
+
         self.site.access_logger.info(
             "%s - %s - {%s}"
             " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
@@ -79,7 +115,7 @@ class SynapseRequest(Request):
             self.getClientIP(),
             self.site.site_tag,
             self.authenticated_entity,
-            int(time.time() * 1000) - self.start_time,
+            end_time - self.start_time,
             int(ru_utime * 1000),
             int(ru_stime * 1000),
             db_sched_duration_ms,
@@ -93,11 +129,38 @@ class SynapseRequest(Request):
             self.get_user_agent(),
         )
 
+        try:
+            self.request_metrics.stop(end_time, self)
+        except Exception as e:
+            logger.warn("Failed to stop metrics: %r", e)
+
     @contextlib.contextmanager
-    def processing(self):
-        self.started_processing()
+    def processing(self, servlet_name):
+        """Record the fact that we are processing this request.
+
+        Returns a context manager; the correct way to use this is:
+
+        @defer.inlineCallbacks
+        def handle_request(request):
+            with request.processing("FooServlet"):
+                yield really_handle_the_request()
+
+        This will log the request's arrival. Once the context manager is
+        closed, the completion of the request will be logged, and the various
+        metrics will be updated.
+
+        Args:
+            servlet_name (str): the name of the servlet which will be
+                processing this request. This is used in the metrics.
+
+                It is possible to update this afterwards by updating
+                self.request_metrics.servlet_name.
+        """
+        # TODO: we should probably just move this into render() and finish(),
+        # to save having to call a separate method.
+        self._started_processing(servlet_name)
         yield
-        self.finished_processing()
+        self._finished_processing()
 
 
 class XForwardedForRequest(SynapseRequest):
@@ -135,7 +198,8 @@ class SynapseSite(Site):
     Subclass of a twisted http Site that does access logging with python's
     standard logging
     """
-    def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
+    def __init__(self, logger_name, site_tag, config, resource,
+                 server_version_string, *args, **kwargs):
         Site.__init__(self, resource, *args, **kwargs)
 
         self.site_tag = site_tag
@@ -143,6 +207,7 @@ class SynapseSite(Site):
         proxied = config.get("x_forwarded", False)
         self.requestFactory = SynapseRequestFactory(self, proxied)
         self.access_logger = logging.getLogger(logger_name)
+        self.server_version_string = server_version_string
 
     def log(self, request):
         pass
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 0206e664c1..40e523cc5f 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -176,7 +176,6 @@ class PushersRemoveRestServlet(RestServlet):
 
         request.setResponseCode(200)
         request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-        request.setHeader(b"Server", self.hs.version_string)
         request.setHeader(b"Content-Length", b"%d" % (
             len(PushersRemoveRestServlet.SUCCESS_HTML),
         ))
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 8e5577148f..d6f3a19648 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -129,7 +129,6 @@ class AuthRestServlet(RestServlet):
             html_bytes = html.encode("utf8")
             request.setResponseCode(200)
             request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-            request.setHeader(b"Server", self.hs.version_string)
             request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
 
             request.write(html_bytes)
@@ -175,7 +174,6 @@ class AuthRestServlet(RestServlet):
             html_bytes = html.encode("utf8")
             request.setResponseCode(200)
             request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-            request.setHeader(b"Server", self.hs.version_string)
             request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
 
             request.write(html_bytes)
diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py
index bd4fea5774..1498d188c1 100644
--- a/synapse/rest/key/v1/server_key_resource.py
+++ b/synapse/rest/key/v1/server_key_resource.py
@@ -49,7 +49,6 @@ class LocalKey(Resource):
     """
 
     def __init__(self, hs):
-        self.version_string = hs.version_string
         self.response_body = encode_canonical_json(
             self.response_json_object(hs.config)
         )
@@ -84,7 +83,6 @@ class LocalKey(Resource):
     def render_GET(self, request):
         return respond_with_json_bytes(
             request, 200, self.response_body,
-            version_string=self.version_string
         )
 
     def getChild(self, name, request):
diff --git a/synapse/rest/key/v2/local_key_resource.py b/synapse/rest/key/v2/local_key_resource.py
index be68d9a096..04775b3c45 100644
--- a/synapse/rest/key/v2/local_key_resource.py
+++ b/synapse/rest/key/v2/local_key_resource.py
@@ -63,7 +63,6 @@ class LocalKey(Resource):
     isLeaf = True
 
     def __init__(self, hs):
-        self.version_string = hs.version_string
         self.config = hs.config
         self.clock = hs.clock
         self.update_response_body(self.clock.time_msec())
@@ -115,5 +114,4 @@ class LocalKey(Resource):
             self.update_response_body(time_now)
         return respond_with_json_bytes(
             request, 200, self.response_body,
-            version_string=self.version_string
         )
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index 17e6079cba..21b4c1175e 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -12,7 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.http.server import request_handler, respond_with_json_bytes
+from synapse.http.server import (
+    respond_with_json_bytes, wrap_json_request_handler,
+)
 from synapse.http.servlet import parse_integer, parse_json_object_from_request
 from synapse.api.errors import SynapseError, Codes
 from synapse.crypto.keyring import KeyLookupError
@@ -91,7 +93,6 @@ class RemoteKey(Resource):
     def __init__(self, hs):
         self.keyring = hs.get_keyring()
         self.store = hs.get_datastore()
-        self.version_string = hs.version_string
         self.clock = hs.get_clock()
         self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
@@ -99,7 +100,7 @@ class RemoteKey(Resource):
         self.async_render_GET(request)
         return NOT_DONE_YET
 
-    @request_handler()
+    @wrap_json_request_handler
     @defer.inlineCallbacks
     def async_render_GET(self, request):
         if len(request.postpath) == 1:
@@ -124,7 +125,7 @@ class RemoteKey(Resource):
         self.async_render_POST(request)
         return NOT_DONE_YET
 
-    @request_handler()
+    @wrap_json_request_handler
     @defer.inlineCallbacks
     def async_render_POST(self, request):
         content = parse_json_object_from_request(request)
@@ -240,5 +241,4 @@ class RemoteKey(Resource):
 
             respond_with_json_bytes(
                 request, 200, result_io.getvalue(),
-                version_string=self.version_string
             )
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index fe7e17596f..8cf8820c31 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -12,16 +12,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.http.servlet
+import logging
 
-from ._base import parse_media_id, respond_404
+from twisted.internet import defer
 from twisted.web.resource import Resource
-from synapse.http.server import request_handler, set_cors_headers
-
 from twisted.web.server import NOT_DONE_YET
-from twisted.internet import defer
 
-import logging
+from synapse.http.server import (
+    set_cors_headers,
+    wrap_json_request_handler,
+)
+import synapse.http.servlet
+from ._base import parse_media_id, respond_404
 
 logger = logging.getLogger(__name__)
 
@@ -35,15 +37,14 @@ class DownloadResource(Resource):
         self.media_repo = media_repo
         self.server_name = hs.hostname
 
-        # Both of these are expected by @request_handler()
+        # this is expected by @wrap_json_request_handler
         self.clock = hs.get_clock()
-        self.version_string = hs.version_string
 
     def render_GET(self, request):
         self._async_render_GET(request)
         return NOT_DONE_YET
 
-    @request_handler()
+    @wrap_json_request_handler
     @defer.inlineCallbacks
     def _async_render_GET(self, request):
         set_cors_headers(request)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 9290d7946f..2839207abc 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -40,8 +40,9 @@ from synapse.util.stringutils import random_string
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.http.client import SpiderHttpClient
 from synapse.http.server import (
-    request_handler, respond_with_json_bytes,
+    respond_with_json_bytes,
     respond_with_json,
+    wrap_json_request_handler,
 )
 from synapse.util.async import ObservableDeferred
 from synapse.util.stringutils import is_ascii
@@ -57,7 +58,6 @@ class PreviewUrlResource(Resource):
 
         self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-        self.version_string = hs.version_string
         self.filepaths = media_repo.filepaths
         self.max_spider_size = hs.config.max_spider_size
         self.server_name = hs.hostname
@@ -90,7 +90,7 @@ class PreviewUrlResource(Resource):
         self._async_render_GET(request)
         return NOT_DONE_YET
 
-    @request_handler()
+    @wrap_json_request_handler
     @defer.inlineCallbacks
     def _async_render_GET(self, request):
 
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 58ada49711..aae6e464e8 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -14,18 +14,21 @@
 # limitations under the License.
 
 
-from ._base import (
-    parse_media_id, respond_404, respond_with_file, FileInfo,
-    respond_with_responder,
-)
-from twisted.web.resource import Resource
-from synapse.http.servlet import parse_string, parse_integer
-from synapse.http.server import request_handler, set_cors_headers
+import logging
 
-from twisted.web.server import NOT_DONE_YET
 from twisted.internet import defer
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
 
-import logging
+from synapse.http.server import (
+    set_cors_headers,
+    wrap_json_request_handler,
+)
+from synapse.http.servlet import parse_integer, parse_string
+from ._base import (
+    FileInfo, parse_media_id, respond_404, respond_with_file,
+    respond_with_responder,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -41,14 +44,13 @@ class ThumbnailResource(Resource):
         self.media_storage = media_storage
         self.dynamic_thumbnails = hs.config.dynamic_thumbnails
         self.server_name = hs.hostname
-        self.version_string = hs.version_string
         self.clock = hs.get_clock()
 
     def render_GET(self, request):
         self._async_render_GET(request)
         return NOT_DONE_YET
 
-    @request_handler()
+    @wrap_json_request_handler
     @defer.inlineCallbacks
     def _async_render_GET(self, request):
         set_cors_headers(request)
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index a31e75cb46..7567476fce 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -13,16 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.http.server import respond_with_json, request_handler
-
-from synapse.api.errors import SynapseError
+import logging
 
-from twisted.web.server import NOT_DONE_YET
 from twisted.internet import defer
-
 from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
 
-import logging
+from synapse.api.errors import SynapseError
+from synapse.http.server import (
+    respond_with_json,
+    wrap_json_request_handler,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -40,7 +41,6 @@ class UploadResource(Resource):
         self.server_name = hs.hostname
         self.auth = hs.get_auth()
         self.max_upload_size = hs.config.max_upload_size
-        self.version_string = hs.version_string
         self.clock = hs.get_clock()
 
     def render_POST(self, request):
@@ -51,7 +51,7 @@ class UploadResource(Resource):
         respond_with_json(request, 200, {}, send_cors=True)
         return NOT_DONE_YET
 
-    @request_handler()
+    @wrap_json_request_handler
     @defer.inlineCallbacks
     def _async_render_POST(self, request):
         requester = yield self.auth.get_user_by_req(request)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index f0784ba137..ea24710ad8 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -38,16 +38,16 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 
-from synapse.util.caches.descriptors import cached
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.engines import PostgresEngine
 
 import abc
 import logging
 
 from six.moves import range
+from collections import namedtuple
 
 
 logger = logging.getLogger(__name__)
@@ -60,6 +60,12 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
+# Used as return values for pagination APIs
+_EventDictReturn = namedtuple("_EventDictReturn", (
+    "event_id", "topological_ordering", "stream_ordering",
+))
+
+
 def lower_bound(token, engine, inclusive=False):
     inclusive = "=" if inclusive else ""
     if token.topological is None:
@@ -227,54 +233,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
                                         order='DESC'):
-        # Note: If from_key is None then we return in topological order. This
-        # is because in that case we're using this as a "get the last few messages
-        # in a room" function, rather than "get new messages since last sync"
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
-        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
+        """Get new room events in stream ordering since `from_key`.
+
+        Args:
+            room_id (str)
+            from_key (str): Token from which no events are returned before
+            to_key (str): Token from which no events are returned after. (This
+                is typically the current stream token)
+            limit (int): Maximum number of events to return
+            order (str): Either "DESC" or "ASC". Determines which events are
+                returned when the result is limited. If "DESC" then the most
+                recent `limit` events are returned, otherwise returns the
+                oldest `limit` events.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
+            events (in ascending order) and the token from the start of
+            the chunk of events returned.
+        """
         if from_key == to_key:
             defer.returnValue(([], from_key))
 
-        if from_id:
-            has_changed = yield self._events_stream_cache.has_entity_changed(
-                room_id, from_id
-            )
-
-            if not has_changed:
-                defer.returnValue(([], from_key))
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
-        def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering > ? AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering %s LIMIT ?"
-                ) % (order,)
-                txn.execute(sql, (room_id, from_id, to_id, limit))
-            else:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
-                ) % (order, order,)
-                txn.execute(sql, (room_id, to_id, limit))
+        has_changed = yield self._events_stream_cache.has_entity_changed(
+            room_id, from_id
+        )
 
-            rows = self.cursor_to_dict(txn)
+        if not has_changed:
+            defer.returnValue(([], from_key))
 
+        def f(txn):
+            sql = (
+                "SELECT event_id, stream_ordering FROM events WHERE"
+                " room_id = ?"
+                " AND not outlier"
+                " AND stream_ordering > ? AND stream_ordering <= ?"
+                " ORDER BY stream_ordering %s LIMIT ?"
+            ) % (order,)
+            txn.execute(sql, (room_id, from_id, to_id, limit))
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -284,7 +291,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ret.reverse()
 
         if rows:
-            key = "s%d" % min(r["stream_ordering"] for r in rows)
+            key = "s%d" % min(r.stream_ordering for r in rows)
         else:
             # Assume we didn't get anything because there was nothing to
             # get.
@@ -294,10 +301,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_membership_changes_for_user(self, user_id, from_key, to_key):
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
@@ -311,34 +315,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 defer.returnValue([])
 
         def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT m.event_id, stream_ordering FROM events AS e,"
-                    " room_memberships AS m"
-                    " WHERE e.event_id = m.event_id"
-                    " AND m.user_id = ?"
-                    " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
-                    " ORDER BY e.stream_ordering ASC"
-                )
-                txn.execute(sql, (user_id, from_id, to_id,))
-            else:
-                sql = (
-                    "SELECT m.event_id, stream_ordering FROM events AS e,"
-                    " room_memberships AS m"
-                    " WHERE e.event_id = m.event_id"
-                    " AND m.user_id = ?"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering ASC"
-                )
-                txn.execute(sql, (user_id, to_id,))
-            rows = self.cursor_to_dict(txn)
+            sql = (
+                "SELECT m.event_id, stream_ordering FROM events AS e,"
+                " room_memberships AS m"
+                " WHERE e.event_id = m.event_id"
+                " AND m.user_id = ?"
+                " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+            )
+            txn.execute(sql, (user_id, from_id, to_id,))
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
 
             return rows
 
         rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -347,14 +341,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+    def get_recent_events_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
+
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent],  str]]: Returns a list of
+            events and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
+        """
+
         rows, token = yield self.get_recent_event_ids_for_room(
-            room_id, limit, end_token, from_token
+            room_id, limit, end_token,
         )
 
         logger.debug("stream before")
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
         logger.debug("stream after")
@@ -363,60 +371,36 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue((events, token))
 
-    @cached(num_args=4)
-    def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
-        end_token = RoomStreamToken.parse_stream_token(end_token)
-
-        if from_token is None:
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-        else:
-            from_token = RoomStreamToken.parse_stream_token(from_token)
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-        def get_recent_events_for_room_txn(txn):
-            if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, False, limit,))
-            else:
-                txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, False, limit
-                ))
+    @defer.inlineCallbacks
+    def get_recent_event_ids_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
 
-            rows = self.cursor_to_dict(txn)
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
 
-            rows.reverse()  # As we selected with reverse ordering
+        Returns:
+            Deferred[tuple[list[_EventDictReturn],  str]]: Returns a list of
+            _EventDictReturn and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
+        """
+        # Allow a zero limit here, and no-op.
+        if limit == 0:
+            defer.returnValue(([], end_token))
 
-            if rows:
-                # Tokens are positions between events.
-                # This token points *after* the last event in the chunk.
-                # We need it to point to the event before it in the chunk
-                # since we are going backwards so we subtract one from the
-                # stream part.
-                topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"] - 1
-                start_token = str(RoomStreamToken(topo, toke))
+        end_token = RoomStreamToken.parse(end_token)
 
-                token = (start_token, str(end_token))
-            else:
-                token = (str(end_token), str(end_token))
+        rows, token = yield self.runInteraction(
+            "get_recent_event_ids_for_room", self._paginate_room_events_txn,
+            room_id, from_token=end_token, limit=limit,
+        )
 
-            return rows, token
+        # We want to return the results in ascending order.
+        rows.reverse()
 
-        return self.runInteraction(
-            "get_recent_events_for_room", get_recent_events_for_room_txn
-        )
+        defer.returnValue((rows, token))
 
     def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
         """Gets details of the first event in a room at or after a stream ordering
@@ -520,10 +504,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @staticmethod
     def _set_before_and_after(events, rows, topo_order=True):
+        """Inserts ordering information to events' internal metadata from
+        the DB rows.
+
+        Args:
+            events (list[FrozenEvent])
+            rows (list[_EventDictReturn])
+            topo_order (bool): Whether the events were ordered topologically
+                or by stream ordering. If true then all rows should have a non
+                null topological_ordering.
+        """
         for event, row in zip(events, rows):
-            stream = row["stream_ordering"]
-            if topo_order:
-                topo = event.depth
+            stream = row.stream_ordering
+            if topo_order and row.topological_ordering:
+                topo = row.topological_ordering
             else:
                 topo = None
             internal = event.internal_metadata
@@ -595,87 +589,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcols=["stream_ordering", "topological_ordering"],
         )
 
-        token = RoomStreamToken(
-            results["topological_ordering"],
+        # Paginating backwards includes the event at the token, but paginating
+        # forward doesn't.
+        before_token = RoomStreamToken(
+            results["topological_ordering"] - 1,
             results["stream_ordering"],
         )
 
-        if isinstance(self.database_engine, Sqlite3Engine):
-            # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
-            # So we give pass it to SQLite3 as the UNION ALL of the two queries.
-
-            query_before = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering < ?"
-                " UNION ALL"
-                " SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-            )
-            before_args = (
-                room_id, token.topological,
-                room_id, token.topological, token.stream,
-                before_limit,
-            )
-
-            query_after = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering > ?"
-                " UNION ALL"
-                " SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
-                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-            )
-            after_args = (
-                room_id, token.topological,
-                room_id, token.topological, token.stream,
-                after_limit,
-            )
-        else:
-            query_before = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND %s"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-            ) % (upper_bound(token, self.database_engine, inclusive=False),)
-
-            before_args = (room_id, before_limit)
-
-            query_after = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND %s"
-                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-            ) % (lower_bound(token, self.database_engine, inclusive=False),)
-
-            after_args = (room_id, after_limit)
-
-        txn.execute(query_before, before_args)
-
-        rows = self.cursor_to_dict(txn)
-        events_before = [r["event_id"] for r in rows]
-
-        if rows:
-            start_token = str(RoomStreamToken(
-                rows[0]["topological_ordering"],
-                rows[0]["stream_ordering"] - 1,
-            ))
-        else:
-            start_token = str(RoomStreamToken(
-                token.topological,
-                token.stream - 1,
-            ))
-
-        txn.execute(query_after, after_args)
+        after_token = RoomStreamToken(
+            results["topological_ordering"],
+            results["stream_ordering"],
+        )
 
-        rows = self.cursor_to_dict(txn)
-        events_after = [r["event_id"] for r in rows]
+        rows, start_token = self._paginate_room_events_txn(
+            txn, room_id, before_token, direction='b', limit=before_limit,
+        )
+        events_before = [r.event_id for r in rows]
 
-        if rows:
-            end_token = str(RoomStreamToken(
-                rows[-1]["topological_ordering"],
-                rows[-1]["stream_ordering"],
-            ))
-        else:
-            end_token = str(token)
+        rows, end_token = self._paginate_room_events_txn(
+            txn, room_id, after_token, direction='f', limit=after_limit,
+        )
+        events_after = [r.event_id for r in rows]
 
         return {
             "before": {
@@ -738,17 +672,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 
+    def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
+                                  direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
+        Args:
+            txn
+            room_id (str)
+            from_token (RoomStreamToken): The token used to stream from
+            to_token (RoomStreamToken|None): A token which if given limits the
+                results to only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-    @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
+        Returns:
+            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
+            as a list of _EventDictReturn and a token that points to the end
+            of the result set.
+        """
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -756,20 +701,20 @@ class StreamStore(StreamWorkerStore):
         if direction == 'b':
             order = "DESC"
             bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
         else:
             order = "ASC"
             bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
 
         filter_clause, filter_args = filter_to_clause(event_filter)
@@ -785,7 +730,8 @@ class StreamStore(StreamWorkerStore):
             limit_str = ""
 
         sql = (
-            "SELECT * FROM events"
+            "SELECT event_id, topological_ordering, stream_ordering"
+            " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s %(limit)s"
@@ -795,35 +741,72 @@ class StreamStore(StreamWorkerStore):
             "limit": limit_str
         }
 
-        def f(txn):
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
+        txn.execute(sql, args)
+
+        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+
+        if rows:
+            topo = rows[-1].topological_ordering
+            toke = rows[-1].stream_ordering
+            if direction == 'b':
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # when we are going backwards so we subtract one from the
+                # stream part.
+                toke -= 1
+            next_token = RoomStreamToken(topo, toke)
+        else:
+            # TODO (erikj): We should work out what to do here instead.
+            next_token = to_token if to_token else from_token
+
+        return rows, str(next_token),
+
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-            return rows, next_token,
+        Args:
+            room_id (str)
+            from_key (str): The token used to stream from
+            to_key (str|None): A token which if given limits the results to
+                only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-        rows, token = yield self.runInteraction("paginate_room_events", f)
+        Returns:
+            tuple[list[dict], str]: Returns the results as a list of dicts and
+            a token that points to the end of the result set. The dicts have
+            the keys "event_id", "topological_ordering" and "stream_orderign".
+        """
+
+        from_key = RoomStreamToken.parse(from_key)
+        if to_key:
+            to_key = RoomStreamToken.parse(to_key)
+
+        rows, token = yield self.runInteraction(
+            "paginate_room_events", self._paginate_room_events_txn,
+            room_id, from_key, to_key, direction, limit, event_filter,
+        )
 
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
         self._set_before_and_after(events, rows)
 
         defer.returnValue((events, token))
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()