summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-04 14:04:35 +0100
committerErik Johnston <erik@matrix.org>2016-08-04 14:04:35 +0100
commitb4e2290d8951822bb3309b99830a775d9e5826e2 (patch)
tree5333716daa806065b5a59c4815a022b855032e3e
parentTidy up get_events (diff)
parentMerge pull request #982 from matrix-org/erikj/fix_port_script (diff)
downloadsynapse-b4e2290d8951822bb3309b99830a775d9e5826e2.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/state_ids_api
-rwxr-xr-xjenkins-dendron-postgres.sh63
-rwxr-xr-xjenkins-postgres.sh44
-rwxr-xr-xjenkins-sqlite.sh39
-rwxr-xr-xjenkins/clone.sh36
-rwxr-xr-xscripts/synapse_port_db175
-rw-r--r--synapse/federation/federation_server.py20
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/e2e_keys.py139
-rw-r--r--synapse/rest/client/v2_alpha/keys.py50
-rw-r--r--synapse/server.py45
-rw-r--r--synapse/server.pyi4
-rw-r--r--synapse/storage/end_to_end_keys.py60
-rw-r--r--synapse/storage/events.py38
-rw-r--r--synapse/storage/schema/delta/33/devices_for_e2e_keys.sql2
-rw-r--r--synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql20
-rw-r--r--tests/handlers/test_e2e_keys.py46
-rw-r--r--tests/storage/test_end_to_end_keys.py90
18 files changed, 572 insertions, 305 deletions
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index e6e94cc8bc..68912a8967 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -4,62 +4,19 @@ set -eux
 
 : ${WORKSPACE:="$(pwd)"}
 
+export WORKSPACE
 export PYTHONDONTWRITEBYTECODE=yep
 export SYNAPSE_CACHE_FACTOR=1
 
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
-
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
-
-rm .coverage* || echo "No coverage files to remove"
-
 ./jenkins/prepare_synapse.sh
-
 ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
 ./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
-
-: ${GOPATH:=${WORKSPACE}/.gopath}
-if [[ "${GOPATH}" != *:* ]]; then
-  mkdir -p "${GOPATH}"
-  export PATH="${GOPATH}/bin:${PATH}"
-fi
-export GOPATH
-
-cd dendron
-
-go get github.com/constabulary/gb/...
-gb generate
-gb build
-
-cd ../sytest
-
-: ${PORT_BASE:=20000}
-: ${PORT_COUNT=100}
-export PORT_BASE
-export PORT_COUNT
-
-./jenkins/prep_sytest_for_postgres.sh
-
-mkdir -p var
-
-echo >&2 "Running sytest with PostgreSQL";
-
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-./jenkins/install_and_run.sh --python $TOX_BIN/python \
-                             --synapse-directory $WORKSPACE \
-                             --dendron $WORKSPACE/dendron/bin/dendron \
-                             --pusher \
-                             --synchrotron \
-                             --federation-reader \
-                             --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1))
-
-cd ..
+./dendron/jenkins/build_dendron.sh
+./sytest/jenkins/prep_sytest_for_postgres.sh
+
+./sytest/jenkins/install_and_run.sh \
+    --synapse-directory $WORKSPACE \
+    --dendron $WORKSPACE/dendron/bin/dendron \
+    --pusher \
+    --synchrotron \
+    --federation-reader \
diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh
index edf61a45bb..f2ca8ccdff 100755
--- a/jenkins-postgres.sh
+++ b/jenkins-postgres.sh
@@ -4,50 +4,14 @@ set -eux
 
 : ${WORKSPACE:="$(pwd)"}
 
+export WORKSPACE
 export PYTHONDONTWRITEBYTECODE=yep
 export SYNAPSE_CACHE_FACTOR=1
 
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
-
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
-
-rm .coverage* || echo "No coverage files to remove"
-
 ./jenkins/prepare_synapse.sh
-
 ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
 
-: ${PORT_BASE:=20000}
-: ${PORT_COUNT=100}
-export PORT_BASE
-export PORT_COUNT
-
-cd sytest
-
-./jenkins/prep_sytest_for_postgres.sh
-
-echo >&2 "Running sytest with PostgreSQL";
-
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-./jenkins/install_and_run.sh --coverage \
-                             --python $TOX_BIN/python \
-                             --synapse-directory $WORKSPACE \
-                             --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1))
-
-cd ..
-cp sytest/.coverage.* .
+./sytest/jenkins/prep_sytest_for_postgres.sh
 
-# Combine the coverage reports
-echo "Combining:" .coverage.*
-$TOX_BIN/python -m coverage combine
-# Output coverage to coverage.xml
-$TOX_BIN/coverage xml -o coverage.xml
+./sytest/jenkins/install_and_run.sh \
+    --synapse-directory $WORKSPACE \
diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh
index 1c3530ebb3..84613d979c 100755
--- a/jenkins-sqlite.sh
+++ b/jenkins-sqlite.sh
@@ -8,43 +8,8 @@ export WORKSPACE
 export PYTHONDONTWRITEBYTECODE=yep
 export SYNAPSE_CACHE_FACTOR=1
 
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
-
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
-
-rm .coverage* || echo "No coverage files to remove"
-
 ./jenkins/prepare_synapse.sh
-
 ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
 
-: ${PORT_BASE:=20000}
-: ${PORT_COUNT=100}
-export PORT_BASE
-export PORT_COUNT
-
-cd sytest
-
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-./jenkins/install_and_run.sh --coverage \
-                             --python $TOX_BIN/python \
-                             --synapse-directory $WORKSPACE \
-                             --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \
-
-cd ..
-cp sytest/.coverage.* .
-
-# Combine the coverage reports
-echo "Combining:" .coverage.*
-$TOX_BIN/python -m coverage combine
-# Output coverage to coverage.xml
-$TOX_BIN/coverage xml -o coverage.xml
+./sytest/jenkins/install_and_run.sh \
+    --synapse-directory $WORKSPACE \
diff --git a/jenkins/clone.sh b/jenkins/clone.sh
index f56d076ea9..ab30ac7782 100755
--- a/jenkins/clone.sh
+++ b/jenkins/clone.sh
@@ -1,24 +1,44 @@
 #! /bin/bash
 
+# This clones a project from github into a named subdirectory
+# If the project has a branch with the same name as this branch
+# then it will checkout that branch after cloning.
+# Otherwise it will checkout "origin/develop."
+# The first argument is the name of the directory to checkout
+# the branch into.
+# The second argument is the URL of the remote repository to checkout.
+# Usually something like https://github.com/matrix-org/sytest.git
+
+set -eux
+
 NAME=$1
 PROJECT=$2
 BASE=".$NAME-base"
 
-# update our clone
-if [ ! -d .$NAME-base ]; then
-  git clone $PROJECT $BASE --mirror
+# Update our mirror.
+if [ ! -d ".$NAME-base" ]; then
+  # Create a local mirror of the source repository.
+  # This saves us from having to download the entire repository
+  # when this script is next run.
+  git clone "$PROJECT" "$BASE" --mirror
 else
-  (cd $BASE; git fetch -p)
+  # Fetch any updates from the source repository.
+  (cd "$BASE"; git fetch -p)
 fi
 
-rm -rf $NAME
-git clone $BASE $NAME --shared
+# Remove the existing repository so that we have a clean copy
+rm -rf "$NAME"
+# Cloning with --shared means that we will share portions of the
+# .git directory with our local mirror.
+git clone "$BASE" "$NAME" --shared
 
+# Jenkins may have supplied us with the name of the branch in the
+# environment. Otherwise we will have to guess based on the current
+# commit.
 : ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
-cd $NAME
+cd "$NAME"
 # check out the relevant branch
 git checkout "${GIT_BRANCH}" || (
     echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop"
     git checkout "origin/develop"
 )
-git clean -df .
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index efd04da2d6..66c61b0198 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -34,7 +34,7 @@ logger = logging.getLogger("synapse_port_db")
 
 
 BOOLEAN_COLUMNS = {
-    "events": ["processed", "outlier"],
+    "events": ["processed", "outlier", "contains_url"],
     "rooms": ["is_public"],
     "event_edges": ["is_state"],
     "presence_list": ["accepted"],
@@ -92,8 +92,12 @@ class Store(object):
 
     _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
     _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
+    _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
+    _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
     _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
-    _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
+    _simple_select_one_onecol_txn = SQLBaseStore.__dict__[
+        "_simple_select_one_onecol_txn"
+    ]
 
     _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
     _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
@@ -158,31 +162,40 @@ class Porter(object):
     def setup_table(self, table):
         if table in APPEND_ONLY_TABLES:
             # It's safe to just carry on inserting.
-            next_chunk = yield self.postgres_store._simple_select_one_onecol(
+            row = yield self.postgres_store._simple_select_one(
                 table="port_from_sqlite3",
                 keyvalues={"table_name": table},
-                retcol="rowid",
+                retcols=("forward_rowid", "backward_rowid"),
                 allow_none=True,
             )
 
             total_to_port = None
-            if next_chunk is None:
+            if row is None:
                 if table == "sent_transactions":
-                    next_chunk, already_ported, total_to_port = (
+                    forward_chunk, already_ported, total_to_port = (
                         yield self._setup_sent_transactions()
                     )
+                    backward_chunk = 0
                 else:
                     yield self.postgres_store._simple_insert(
                         table="port_from_sqlite3",
-                        values={"table_name": table, "rowid": 1}
+                        values={
+                            "table_name": table,
+                            "forward_rowid": 1,
+                            "backward_rowid": 0,
+                        }
                     )
 
-                    next_chunk = 1
+                    forward_chunk = 1
+                    backward_chunk = 0
                     already_ported = 0
+            else:
+                forward_chunk = row["forward_rowid"]
+                backward_chunk = row["backward_rowid"]
 
             if total_to_port is None:
                 already_ported, total_to_port = yield self._get_total_count_to_port(
-                    table, next_chunk
+                    table, forward_chunk, backward_chunk
                 )
         else:
             def delete_all(txn):
@@ -196,46 +209,85 @@ class Porter(object):
 
             yield self.postgres_store._simple_insert(
                 table="port_from_sqlite3",
-                values={"table_name": table, "rowid": 0}
+                values={
+                    "table_name": table,
+                    "forward_rowid": 1,
+                    "backward_rowid": 0,
+                }
             )
 
-            next_chunk = 1
+            forward_chunk = 1
+            backward_chunk = 0
 
             already_ported, total_to_port = yield self._get_total_count_to_port(
-                table, next_chunk
+                table, forward_chunk, backward_chunk
             )
 
-        defer.returnValue((table, already_ported, total_to_port, next_chunk))
+        defer.returnValue(
+            (table, already_ported, total_to_port, forward_chunk, backward_chunk)
+        )
 
     @defer.inlineCallbacks
-    def handle_table(self, table, postgres_size, table_size, next_chunk):
+    def handle_table(self, table, postgres_size, table_size, forward_chunk,
+                     backward_chunk):
         if not table_size:
             return
 
         self.progress.add_table(table, postgres_size, table_size)
 
         if table == "event_search":
-            yield self.handle_search_table(postgres_size, table_size, next_chunk)
+            yield self.handle_search_table(
+                postgres_size, table_size, forward_chunk, backward_chunk
+            )
             return
 
-        select = (
+        forward_select = (
             "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
             % (table,)
         )
 
+        backward_select = (
+            "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?"
+            % (table,)
+        )
+
+        do_forward = [True]
+        do_backward = [True]
+
         while True:
             def r(txn):
-                txn.execute(select, (next_chunk, self.batch_size,))
-                rows = txn.fetchall()
-                headers = [column[0] for column in txn.description]
+                forward_rows = []
+                backward_rows = []
+                if do_forward[0]:
+                    txn.execute(forward_select, (forward_chunk, self.batch_size,))
+                    forward_rows = txn.fetchall()
+                    if not forward_rows:
+                        do_forward[0] = False
+
+                if do_backward[0]:
+                    txn.execute(backward_select, (backward_chunk, self.batch_size,))
+                    backward_rows = txn.fetchall()
+                    if not backward_rows:
+                        do_backward[0] = False
+
+                if forward_rows or backward_rows:
+                    headers = [column[0] for column in txn.description]
+                else:
+                    headers = None
 
-                return headers, rows
+                return headers, forward_rows, backward_rows
 
-            headers, rows = yield self.sqlite_store.runInteraction("select", r)
+            headers, frows, brows = yield self.sqlite_store.runInteraction(
+                "select", r
+            )
 
-            if rows:
-                next_chunk = rows[-1][0] + 1
+            if frows or brows:
+                if frows:
+                    forward_chunk = max(row[0] for row in frows) + 1
+                if brows:
+                    backward_chunk = min(row[0] for row in brows) - 1
 
+                rows = frows + brows
                 self._convert_rows(table, headers, rows)
 
                 def insert(txn):
@@ -247,7 +299,10 @@ class Porter(object):
                         txn,
                         table="port_from_sqlite3",
                         keyvalues={"table_name": table},
-                        updatevalues={"rowid": next_chunk},
+                        updatevalues={
+                            "forward_rowid": forward_chunk,
+                            "backward_rowid": backward_chunk,
+                        },
                     )
 
                 yield self.postgres_store.execute(insert)
@@ -259,7 +314,8 @@ class Porter(object):
                 return
 
     @defer.inlineCallbacks
-    def handle_search_table(self, postgres_size, table_size, next_chunk):
+    def handle_search_table(self, postgres_size, table_size, forward_chunk,
+                            backward_chunk):
         select = (
             "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
             " FROM event_search as es"
@@ -270,7 +326,7 @@ class Porter(object):
 
         while True:
             def r(txn):
-                txn.execute(select, (next_chunk, self.batch_size,))
+                txn.execute(select, (forward_chunk, self.batch_size,))
                 rows = txn.fetchall()
                 headers = [column[0] for column in txn.description]
 
@@ -279,7 +335,7 @@ class Porter(object):
             headers, rows = yield self.sqlite_store.runInteraction("select", r)
 
             if rows:
-                next_chunk = rows[-1][0] + 1
+                forward_chunk = rows[-1][0] + 1
 
                 # We have to treat event_search differently since it has a
                 # different structure in the two different databases.
@@ -312,7 +368,10 @@ class Porter(object):
                         txn,
                         table="port_from_sqlite3",
                         keyvalues={"table_name": "event_search"},
-                        updatevalues={"rowid": next_chunk},
+                        updatevalues={
+                            "forward_rowid": forward_chunk,
+                            "backward_rowid": backward_chunk,
+                        },
                     )
 
                 yield self.postgres_store.execute(insert)
@@ -324,7 +383,6 @@ class Porter(object):
             else:
                 return
 
-
     def setup_db(self, db_config, database_engine):
         db_conn = database_engine.module.connect(
             **{
@@ -395,10 +453,32 @@ class Porter(object):
                 txn.execute(
                     "CREATE TABLE port_from_sqlite3 ("
                     " table_name varchar(100) NOT NULL UNIQUE,"
-                    " rowid bigint NOT NULL"
+                    " forward_rowid bigint NOT NULL,"
+                    " backward_rowid bigint NOT NULL"
                     ")"
                 )
 
+            # The old port script created a table with just a "rowid" column.
+            # We want people to be able to rerun this script from an old port
+            # so that they can pick up any missing events that were not
+            # ported across.
+            def alter_table(txn):
+                txn.execute(
+                    "ALTER TABLE IF EXISTS port_from_sqlite3"
+                    " RENAME rowid TO forward_rowid"
+                )
+                txn.execute(
+                    "ALTER TABLE IF EXISTS port_from_sqlite3"
+                    " ADD backward_rowid bigint NOT NULL DEFAULT 0"
+                )
+
+            try:
+                yield self.postgres_store.runInteraction(
+                    "alter_table", alter_table
+                )
+            except Exception as e:
+                logger.info("Failed to create port table: %s", e)
+
             try:
                 yield self.postgres_store.runInteraction(
                     "create_port_table", create_port_table
@@ -458,7 +538,7 @@ class Porter(object):
     @defer.inlineCallbacks
     def _setup_sent_transactions(self):
         # Only save things from the last day
-        yesterday = int(time.time()*1000) - 86400000
+        yesterday = int(time.time() * 1000) - 86400000
 
         # And save the max transaction id from each destination
         select = (
@@ -514,7 +594,11 @@ class Porter(object):
 
         yield self.postgres_store._simple_insert(
             table="port_from_sqlite3",
-            values={"table_name": "sent_transactions", "rowid": next_chunk}
+            values={
+                "table_name": "sent_transactions",
+                "forward_rowid": next_chunk,
+                "backward_rowid": 0,
+            }
         )
 
         def get_sent_table_size(txn):
@@ -535,13 +619,18 @@ class Porter(object):
         defer.returnValue((next_chunk, inserted_rows, total_count))
 
     @defer.inlineCallbacks
-    def _get_remaining_count_to_port(self, table, next_chunk):
-        rows = yield self.sqlite_store.execute_sql(
+    def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
+        frows = yield self.sqlite_store.execute_sql(
             "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,),
-            next_chunk,
+            forward_chunk,
         )
 
-        defer.returnValue(rows[0][0])
+        brows = yield self.sqlite_store.execute_sql(
+            "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,),
+            backward_chunk,
+        )
+
+        defer.returnValue(frows[0][0] + brows[0][0])
 
     @defer.inlineCallbacks
     def _get_already_ported_count(self, table):
@@ -552,10 +641,10 @@ class Porter(object):
         defer.returnValue(rows[0][0])
 
     @defer.inlineCallbacks
-    def _get_total_count_to_port(self, table, next_chunk):
+    def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
         remaining, done = yield defer.gatherResults(
             [
-                self._get_remaining_count_to_port(table, next_chunk),
+                self._get_remaining_count_to_port(table, forward_chunk, backward_chunk),
                 self._get_already_ported_count(table),
             ],
             consumeErrors=True,
@@ -686,7 +775,7 @@ class CursesProgress(Progress):
             color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
 
             self.stdscr.addstr(
-                i+2, left_margin + max_len - len(table),
+                i + 2, left_margin + max_len - len(table),
                 table,
                 curses.A_BOLD | color,
             )
@@ -694,18 +783,18 @@ class CursesProgress(Progress):
             size = 20
 
             progress = "[%s%s]" % (
-                "#" * int(perc*size/100),
-                " " * (size - int(perc*size/100)),
+                "#" * int(perc * size / 100),
+                " " * (size - int(perc * size / 100)),
             )
 
             self.stdscr.addstr(
-                i+2, left_margin + max_len + middle_space,
+                i + 2, left_margin + max_len + middle_space,
                 "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
             )
 
         if self.finished:
             self.stdscr.addstr(
-                rows-1, 0,
+                rows - 1, 0,
                 "Press any key to exit...",
             )
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2b91f93e09..aba19639c7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -393,27 +393,9 @@ class FederationServer(FederationBase):
             (200, send_content)
         )
 
-    @defer.inlineCallbacks
     @log_function
     def on_query_client_keys(self, origin, content):
-        query = []
-        for user_id, device_ids in content.get("device_keys", {}).items():
-            if not device_ids:
-                query.append((user_id, None))
-            else:
-                for device_id in device_ids:
-                    query.append((user_id, device_id))
-
-        results = yield self.store.get_e2e_device_keys(query)
-
-        json_result = {}
-        for user_id, device_keys in results.items():
-            for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[device_id] = json.loads(
-                    json_bytes
-                )
-
-        defer.returnValue({"device_keys": json_result})
+        return self.on_query_request("client_keys", content)
 
     @defer.inlineCallbacks
     @log_function
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3ae7c48457..0bc6e0801d 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -378,10 +378,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
 class FederationClientKeysQueryServlet(BaseFederationServlet):
     PATH = "/user/keys/query"
 
-    @defer.inlineCallbacks
     def on_POST(self, origin, content, query):
-        response = yield self.handler.on_query_client_keys(origin, content)
-        defer.returnValue((200, response))
+        return self.handler.on_query_client_keys(origin, content)
 
 
 class FederationClientKeysClaimServlet(BaseFederationServlet):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f4bf159bb5..8d630c6b1a 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,7 @@ class DeviceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def check_device_registered(self, user_id, device_id,
-                                initial_device_display_name):
+                                initial_device_display_name=None):
         """
         If the given device has not been registered, register it with the
         supplied display name.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
new file mode 100644
index 0000000000..2c7bfd91ed
--- /dev/null
+++ b/synapse/handlers/e2e_keys.py
@@ -0,0 +1,139 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import logging
+
+from twisted.internet import defer
+
+from synapse.api import errors
+import synapse.types
+
+logger = logging.getLogger(__name__)
+
+
+class E2eKeysHandler(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.federation = hs.get_replication_layer()
+        self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
+
+        # doesn't really work as part of the generic query API, because the
+        # query request requires an object POST, but we abuse the
+        # "query handler" interface.
+        self.federation.register_query_handler(
+            "client_keys", self.on_federation_query_client_keys
+        )
+
+    @defer.inlineCallbacks
+    def query_devices(self, query_body):
+        """ Handle a device key query from a client
+
+        {
+            "device_keys": {
+                "<user_id>": ["<device_id>"]
+            }
+        }
+        ->
+        {
+            "device_keys": {
+                "<user_id>": {
+                    "<device_id>": {
+                        ...
+                    }
+                }
+            }
+        }
+        """
+        device_keys_query = query_body.get("device_keys", {})
+
+        # separate users by domain.
+        # make a map from domain to user_id to device_ids
+        queries_by_domain = collections.defaultdict(dict)
+        for user_id, device_ids in device_keys_query.items():
+            user = synapse.types.UserID.from_string(user_id)
+            queries_by_domain[user.domain][user_id] = device_ids
+
+        # do the queries
+        # TODO: do these in parallel
+        results = {}
+        for destination, destination_query in queries_by_domain.items():
+            if destination == self.server_name:
+                res = yield self.query_local_devices(destination_query)
+            else:
+                res = yield self.federation.query_client_keys(
+                    destination, {"device_keys": destination_query}
+                )
+                res = res["device_keys"]
+            for user_id, keys in res.items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+        defer.returnValue((200, {"device_keys": results}))
+
+    @defer.inlineCallbacks
+    def query_local_devices(self, query):
+        """Get E2E device keys for local users
+
+        Args:
+            query (dict[string, list[string]|None): map from user_id to a list
+                 of devices to query (None for all devices)
+
+        Returns:
+            defer.Deferred: (resolves to dict[string, dict[string, dict]]):
+                 map from user_id -> device_id -> device details
+        """
+        local_query = []
+
+        result_dict = {}
+        for user_id, device_ids in query.items():
+            if not self.is_mine_id(user_id):
+                logger.warning("Request for keys for non-local user %s",
+                               user_id)
+                raise errors.SynapseError(400, "Not a user here")
+
+            if not device_ids:
+                local_query.append((user_id, None))
+            else:
+                for device_id in device_ids:
+                    local_query.append((user_id, device_id))
+
+            # make sure that each queried user appears in the result dict
+            result_dict[user_id] = {}
+
+        results = yield self.store.get_e2e_device_keys(local_query)
+
+        # Build the result structure, un-jsonify the results, and add the
+        # "unsigned" section
+        for user_id, device_keys in results.items():
+            for device_id, device_info in device_keys.items():
+                r = json.loads(device_info["key_json"])
+                r["unsigned"] = {}
+                display_name = device_info["device_display_name"]
+                if display_name is not None:
+                    r["unsigned"]["device_display_name"] = display_name
+                result_dict[user_id][device_id] = r
+
+        defer.returnValue(result_dict)
+
+    @defer.inlineCallbacks
+    def on_federation_query_client_keys(self, query_body):
+        """ Handle a device key query from a federated server
+        """
+        device_keys_query = query_body.get("device_keys", {})
+        res = yield self.query_local_devices(device_keys_query)
+        defer.returnValue({"device_keys": res})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index dc1d4d8fc6..c5ff16adf3 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -130,9 +130,7 @@ class KeyUploadServlet(RestServlet):
         # old access_token without an associated device_id. Either way, we
         # need to double-check the device is registered to avoid ending up with
         # keys without a corresponding device.
-        self.device_handler.check_device_registered(
-            user_id, device_id, "unknown device"
-        )
+        self.device_handler.check_device_registered(user_id, device_id)
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
         defer.returnValue((200, {"one_time_key_counts": result}))
@@ -186,17 +184,19 @@ class KeyQueryServlet(RestServlet):
     )
 
     def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         super(KeyQueryServlet, self).__init__()
-        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
-        self.federation = hs.get_replication_layer()
-        self.is_mine = hs.is_mine
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, user_id, device_id):
         yield self.auth.get_user_by_req(request)
         body = parse_json_object_from_request(request)
-        result = yield self.handle_request(body)
+        result = yield self.e2e_keys_handler.query_devices(body)
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -205,45 +205,11 @@ class KeyQueryServlet(RestServlet):
         auth_user_id = requester.user.to_string()
         user_id = user_id if user_id else auth_user_id
         device_ids = [device_id] if device_id else []
-        result = yield self.handle_request(
+        result = yield self.e2e_keys_handler.query_devices(
             {"device_keys": {user_id: device_ids}}
         )
         defer.returnValue(result)
 
-    @defer.inlineCallbacks
-    def handle_request(self, body):
-        local_query = []
-        remote_queries = {}
-        for user_id, device_ids in body.get("device_keys", {}).items():
-            user = UserID.from_string(user_id)
-            if self.is_mine(user):
-                if not device_ids:
-                    local_query.append((user_id, None))
-                else:
-                    for device_id in device_ids:
-                        local_query.append((user_id, device_id))
-            else:
-                remote_queries.setdefault(user.domain, {})[user_id] = list(
-                    device_ids
-                )
-        results = yield self.store.get_e2e_device_keys(local_query)
-
-        json_result = {}
-        for user_id, device_keys in results.items():
-            for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[device_id] = json.loads(
-                    json_bytes
-                )
-
-        for destination, device_keys in remote_queries.items():
-            remote_result = yield self.federation.query_client_keys(
-                destination, {"device_keys": device_keys}
-            )
-            for user_id, keys in remote_result["device_keys"].items():
-                if user_id in device_keys:
-                    json_result[user_id] = keys
-        defer.returnValue((200, {"device_keys": json_result}))
-
 
 class OneTimeKeyServlet(RestServlet):
     """
diff --git a/synapse/server.py b/synapse/server.py
index e8b166990d..6bb4988309 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -19,39 +19,38 @@
 # partial one for unit test mocking.
 
 # Imports required for the default HomeServer() implementation
-from twisted.web.client import BrowserLikePolicyForHTTPS
+import logging
+
 from twisted.enterprise import adbapi
+from twisted.web.client import BrowserLikePolicyForHTTPS
 
-from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.api.auth import Auth
+from synapse.api.filtering import Filtering
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.appservice.api import ApplicationServiceApi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.crypto.keyring import Keyring
+from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
-from synapse.handlers.device import DeviceHandler
-from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
-from synapse.notifier import Notifier
-from synapse.api.auth import Auth
 from synapse.handlers import Handlers
+from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.device import DeviceHandler
+from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
+from synapse.handlers.room import RoomListHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
-from synapse.handlers.room import RoomListHandler
-from synapse.handlers.auth import AuthHandler
-from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
+from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.notifier import Notifier
+from synapse.push.pusherpool import PusherPool
+from synapse.rest.media.v1.media_repository import MediaRepository
 from synapse.state import StateHandler
 from synapse.storage import DataStore
+from synapse.streams.events import EventSources
 from synapse.util import Clock
 from synapse.util.distributor import Distributor
-from synapse.streams.events import EventSources
-from synapse.api.ratelimiting import Ratelimiter
-from synapse.crypto.keyring import Keyring
-from synapse.push.pusherpool import PusherPool
-from synapse.events.builder import EventBuilderFactory
-from synapse.api.filtering import Filtering
-from synapse.rest.media.v1.media_repository import MediaRepository
-
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
-
-import logging
-
 
 logger = logging.getLogger(__name__)
 
@@ -94,6 +93,7 @@ class HomeServer(object):
         'room_list_handler',
         'auth_handler',
         'device_handler',
+        'e2e_keys_handler',
         'application_service_api',
         'application_service_scheduler',
         'application_service_handler',
@@ -202,6 +202,9 @@ class HomeServer(object):
     def build_device_handler(self):
         return DeviceHandler(self)
 
+    def build_e2e_keys_handler(self):
+        return E2eKeysHandler(self)
+
     def build_application_service_api(self):
         return ApplicationServiceApi(self)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 902f725c06..c0aa868c4f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,6 +1,7 @@
 import synapse.handlers
 import synapse.handlers.auth
 import synapse.handlers.device
+import synapse.handlers.e2e_keys
 import synapse.storage
 import synapse.state
 
@@ -14,6 +15,9 @@ class HomeServer(object):
     def get_device_handler(self) -> synapse.handlers.device.DeviceHandler:
         pass
 
+    def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler:
+        pass
+
     def get_handlers(self) -> synapse.handlers.Handlers:
         pass
 
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 62b7790e91..385d607056 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import collections
 
 import twisted.internet.defer
 
@@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore):
             query_list(list): List of pairs of user_ids and device_ids.
         Returns:
             Dict mapping from user-id to dict mapping from device_id to
-            key json byte strings.
+            dict containing "key_json", "device_display_name".
         """
-        def _get_e2e_device_keys(txn):
-            result = {}
-            for user_id, device_id in query_list:
-                user_result = result.setdefault(user_id, {})
-                keyvalues = {"user_id": user_id}
-                if device_id:
-                    keyvalues["device_id"] = device_id
-                rows = self._simple_select_list_txn(
-                    txn, table="e2e_device_keys_json",
-                    keyvalues=keyvalues,
-                    retcols=["device_id", "key_json"]
-                )
-                for row in rows:
-                    user_result[row["device_id"]] = row["key_json"]
-            return result
-        return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys)
+        if not query_list:
+            return {}
+
+        return self.runInteraction(
+            "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list
+        )
+
+    def _get_e2e_device_keys_txn(self, txn, query_list):
+        query_clauses = []
+        query_params = []
+
+        for (user_id, device_id) in query_list:
+            query_clause = "k.user_id = ?"
+            query_params.append(user_id)
+
+            if device_id:
+                query_clause += " AND k.device_id = ?"
+                query_params.append(device_id)
+
+            query_clauses.append(query_clause)
+
+        sql = (
+            "SELECT k.user_id, k.device_id, "
+            "    d.display_name AS device_display_name, "
+            "    k.key_json"
+            " FROM e2e_device_keys_json k"
+            "    LEFT JOIN devices d ON d.user_id = k.user_id"
+            "      AND d.device_id = k.device_id"
+            " WHERE %s"
+        ) % (
+            " OR ".join("(" + q + ")" for q in query_clauses)
+        )
+
+        txn.execute(sql, query_params)
+        rows = self.cursor_to_dict(txn)
+
+        result = collections.defaultdict(dict)
+        for row in rows:
+            result[row["user_id"]][row["device_id"]] = row
+
+        return result
 
     def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list):
         def _add_e2e_one_time_keys(txn):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c63ca36df6..4664cfe6d9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
 
 import synapse
 import synapse.metrics
@@ -403,6 +403,23 @@ class EventsStore(SQLBaseStore):
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
         """
+        # Ensure that we don't have the same event twice.
+        # Pick the earliest non-outlier if there is one, else the earliest one.
+        new_events_and_contexts = OrderedDict()
+        for event, context in events_and_contexts:
+            prev_event_context = new_events_and_contexts.get(event.event_id)
+            if prev_event_context:
+                if not event.internal_metadata.is_outlier():
+                    if prev_event_context[0].internal_metadata.is_outlier():
+                        # To ensure correct ordering we pop, as OrderedDict is
+                        # ordered by first insertion.
+                        new_events_and_contexts.pop(event.event_id, None)
+                        new_events_and_contexts[event.event_id] = (event, context)
+            else:
+                new_events_and_contexts[event.event_id] = (event, context)
+
+        events_and_contexts = new_events_and_contexts.values()
+
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -433,8 +450,6 @@ class EventsStore(SQLBaseStore):
             for event_id, outlier in txn.fetchall()
         }
 
-        # Remove the events that we've seen before.
-        event_map = {}
         to_remove = set()
         for event, context in events_and_contexts:
             if context.rejected:
@@ -445,23 +460,6 @@ class EventsStore(SQLBaseStore):
                     to_remove.add(event)
                 continue
 
-            # Handle the case of the list including the same event multiple
-            # times. The tricky thing here is when they differ by whether
-            # they are an outlier.
-            if event.event_id in event_map:
-                other = event_map[event.event_id]
-
-                if not other.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                elif not event.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                else:
-                    to_remove.add(other)
-
-            event_map[event.event_id] = event
-
             if event.event_id not in have_persisted:
                 continue
 
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
index 140f2b63e0..aa4a3b9f2f 100644
--- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
@@ -16,4 +16,4 @@
 -- make sure that we have a device record for each set of E2E keys, so that the
 -- user can delete them if they like.
 INSERT INTO devices
-    SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json;
+    SELECT user_id, device_id, NULL FROM e2e_device_keys_json;
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
new file mode 100644
index 0000000000..6671573398
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a previous version of the "devices_for_e2e_keys" delta set all the device
+-- names to "unknown device". This wasn't terribly helpful
+UPDATE devices
+    SET display_name = NULL
+    WHERE display_name = 'unknown device';
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
new file mode 100644
index 0000000000..878a54dc34
--- /dev/null
+++ b/tests/handlers/test_e2e_keys.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from twisted.internet import defer
+
+import synapse.api.errors
+import synapse.handlers.e2e_keys
+
+import synapse.storage
+from tests import unittest, utils
+
+
+class E2eKeysHandlerTestCase(unittest.TestCase):
+    def __init__(self, *args, **kwargs):
+        super(E2eKeysHandlerTestCase, self).__init__(*args, **kwargs)
+        self.hs = None       # type: synapse.server.HomeServer
+        self.handler = None  # type: synapse.handlers.e2e_keys.E2eKeysHandler
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.hs = yield utils.setup_test_homeserver(
+            handlers=None,
+            replication_layer=mock.Mock(),
+        )
+        self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs)
+
+    @defer.inlineCallbacks
+    def test_query_local_devices_no_devices(self):
+        """If the user has no devices, we expect an empty list.
+        """
+        local_user = "@boris:" + self.hs.hostname
+        res = yield self.handler.query_local_devices({local_user: None})
+        self.assertDictEqual(res, {local_user: {}})
diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py
new file mode 100644
index 0000000000..453bc61438
--- /dev/null
+++ b/tests/storage/test_end_to_end_keys.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+import tests.unittest
+import tests.utils
+
+
+class EndToEndKeyStoreTestCase(tests.unittest.TestCase):
+    def __init__(self, *args, **kwargs):
+        super(EndToEndKeyStoreTestCase, self).__init__(*args, **kwargs)
+        self.store = None  # type: synapse.storage.DataStore
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        hs = yield tests.utils.setup_test_homeserver()
+
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def test_key_without_device_name(self):
+        now = 1470174257070
+        json = '{ "key": "value" }'
+
+        yield self.store.set_e2e_device_keys(
+            "user", "device", now, json)
+
+        res = yield self.store.get_e2e_device_keys((("user", "device"),))
+        self.assertIn("user", res)
+        self.assertIn("device", res["user"])
+        dev = res["user"]["device"]
+        self.assertDictContainsSubset({
+            "key_json": json,
+            "device_display_name": None,
+        }, dev)
+
+    @defer.inlineCallbacks
+    def test_get_key_with_device_name(self):
+        now = 1470174257070
+        json = '{ "key": "value" }'
+
+        yield self.store.set_e2e_device_keys(
+            "user", "device", now, json)
+        yield self.store.store_device(
+            "user", "device", "display_name"
+        )
+
+        res = yield self.store.get_e2e_device_keys((("user", "device"),))
+        self.assertIn("user", res)
+        self.assertIn("device", res["user"])
+        dev = res["user"]["device"]
+        self.assertDictContainsSubset({
+            "key_json": json,
+            "device_display_name": "display_name",
+        }, dev)
+
+    @defer.inlineCallbacks
+    def test_multiple_devices(self):
+        now = 1470174257070
+
+        yield self.store.set_e2e_device_keys(
+            "user1", "device1", now, 'json11')
+        yield self.store.set_e2e_device_keys(
+            "user1", "device2", now, 'json12')
+        yield self.store.set_e2e_device_keys(
+            "user2", "device1", now, 'json21')
+        yield self.store.set_e2e_device_keys(
+            "user2", "device2", now, 'json22')
+
+        res = yield self.store.get_e2e_device_keys((("user1", "device1"),
+                                                    ("user2", "device2")))
+        self.assertIn("user1", res)
+        self.assertIn("device1", res["user1"])
+        self.assertNotIn("device2", res["user1"])
+        self.assertIn("user2", res)
+        self.assertNotIn("device1", res["user2"])
+        self.assertIn("device2", res["user2"])