summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/admin_cmd.py5
-rw-r--r--synapse/app/appservice.py5
-rw-r--r--synapse/app/client_reader.py5
-rw-r--r--synapse/app/event_creator.py5
-rw-r--r--synapse/app/federation_reader.py5
-rw-r--r--synapse/app/federation_sender.py5
-rw-r--r--synapse/app/frontend_proxy.py5
-rw-r--r--synapse/app/homeserver.py7
-rw-r--r--synapse/app/media_repository.py5
-rw-r--r--synapse/app/pusher.py5
-rw-r--r--synapse/app/synchrotron.py5
-rw-r--r--synapse/app/user_dir.py5
-rw-r--r--synapse/handlers/e2e_keys.py35
-rw-r--r--synapse/handlers/federation.py74
-rw-r--r--synapse/server.py12
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py217
-rw-r--r--synapse/storage/data_stores/main/search.py11
-rw-r--r--synapse/util/caches/descriptors.py2
18 files changed, 312 insertions, 101 deletions
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 04751a6a5e..51a909419f 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -45,7 +45,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.logcontext import LoggingContext
 from synapse.util.versionstring import get_version_string
 
@@ -229,14 +228,10 @@ def start(config_options):
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = AdminCmdServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 02b900f382..e82e0f11e3 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -34,7 +34,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -143,8 +142,6 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     if config.notify_appservices:
         sys.stderr.write(
             "\nThe appservices must be disabled in the main synapse process"
@@ -159,10 +156,8 @@ def start(config_options):
 
     ps = AppserviceServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ps, config, use_worker_options=True)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index dadb487d5f..3edfe19567 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -62,7 +62,6 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
 from synapse.rest.client.versions import VersionsRestServlet
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -181,14 +180,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = ClientReaderServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index d110599a35..d0ddbe38fc 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -57,7 +57,6 @@ from synapse.rest.client.v1.room import (
 )
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -180,14 +179,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = EventCreatorServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 418c086254..311523e0ed 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -46,7 +46,6 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -162,14 +161,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = FederationReaderServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index f24920a7d6..83c436229c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -41,7 +41,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.replication.tcp.streams._base import ReceiptsStream
 from synapse.server import HomeServer
 from synapse.storage.database import Database
-from synapse.storage.engines import create_engine
 from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
@@ -174,8 +173,6 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     if config.send_federation:
         sys.stderr.write(
             "\nThe send_federation must be disabled in the main synapse process"
@@ -190,10 +187,8 @@ def start(config_options):
 
     ss = FederationSenderServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index e647459d0e..30e435eead 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -39,7 +39,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha._base import client_patterns
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -234,14 +233,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = FrontendProxyServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 032010600a..b8661457e2 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -69,7 +69,7 @@ from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.well_known import WellKnownResource
 from synapse.server import HomeServer
 from synapse.storage import DataStore
-from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.engines import IncorrectDatabaseSetup
 from synapse.storage.prepare_database import UpgradeDatabaseException
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.httpresourcetree import create_resource_tree
@@ -328,15 +328,10 @@ def setup(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-    config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
-
     hs = SynapseHomeServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 2c6dd3ef02..4c80f257e2 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -40,7 +40,6 @@ from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -157,14 +156,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = MediaRepositoryServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index dd52a9fc2d..09e639040a 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -37,7 +37,6 @@ from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage import DataStore
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -203,14 +202,10 @@ def start(config_options):
     # Force the pushers to start since they will be disabled in the main config
     config.start_pushers = True
 
-    database_engine = create_engine(config.database_config)
-
     ps = PusherServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ps, config, use_worker_options=True)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 288ee64b42..dd2132e608 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -55,7 +55,6 @@ from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
 from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.presence import UserPresenceState
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
@@ -437,14 +436,10 @@ def start(config_options):
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = SynchrotronServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
         application_service_handler=SynchrotronApplicationService(),
     )
 
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index c01fb34a9b..1257098f92 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -44,7 +44,6 @@ from synapse.rest.client.v2_alpha import user_directory
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
 from synapse.storage.database import Database
-from synapse.storage.engines import create_engine
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
@@ -200,8 +199,6 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     if config.update_user_directory:
         sys.stderr.write(
             "\nThe update_user_directory must be disabled in the main synapse process"
@@ -216,10 +213,8 @@ def start(config_options):
 
     ss = UserDirectoryServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 57a10daefd..2d889364d4 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -264,6 +264,7 @@ class E2eKeysHandler(object):
 
         return ret
 
+    @defer.inlineCallbacks
     def get_cross_signing_keys_from_cache(self, query, from_user_id):
         """Get cross-signing keys for users from the database
 
@@ -283,14 +284,32 @@ class E2eKeysHandler(object):
         self_signing_keys = {}
         user_signing_keys = {}
 
-        # Currently a stub, implementation coming in https://github.com/matrix-org/synapse/pull/6486
-        return defer.succeed(
-            {
-                "master_keys": master_keys,
-                "self_signing_keys": self_signing_keys,
-                "user_signing_keys": user_signing_keys,
-            }
-        )
+        user_ids = list(query)
+
+        keys = yield self.store.get_e2e_cross_signing_keys_bulk(user_ids, from_user_id)
+
+        for user_id, user_info in keys.items():
+            if user_info is None:
+                continue
+            if "master" in user_info:
+                master_keys[user_id] = user_info["master"]
+            if "self_signing" in user_info:
+                self_signing_keys[user_id] = user_info["self_signing"]
+
+        if (
+            from_user_id in keys
+            and keys[from_user_id] is not None
+            and "user_signing" in keys[from_user_id]
+        ):
+            # users can see other users' master and self-signing keys, but can
+            # only see their own user-signing keys
+            user_signing_keys[from_user_id] = keys[from_user_id]["user_signing"]
+
+        return {
+            "master_keys": master_keys,
+            "self_signing_keys": self_signing_keys,
+            "user_signing_keys": user_signing_keys,
+        }
 
     @trace
     @defer.inlineCallbacks
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 62985bab9f..2ea69c5468 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -637,6 +637,10 @@ class FederationHandler(BaseHandler):
             room_id
             event_ids
 
+        If we fail to fetch any of the events, a warning will be logged, and the event
+        will be omitted from the result. Likewise, any events which turn out not to
+        be in the given room.
+
         Returns:
             map from event_id to event
         """
@@ -644,35 +648,59 @@ class FederationHandler(BaseHandler):
 
         missing_events = set(event_ids) - fetched_events.keys()
 
-        if not missing_events:
-            return fetched_events
+        if missing_events:
+            logger.debug(
+                "Fetching unknown state/auth events %s for room %s",
+                missing_events,
+                room_id,
+            )
 
-        logger.debug(
-            "Fetching unknown state/auth events %s for room %s",
-            missing_events,
-            event_ids,
-        )
+            room_version = await self.store.get_room_version(room_id)
 
-        room_version = await self.store.get_room_version(room_id)
+            # XXX 20 requests at once? really?
+            for batch in batch_iter(missing_events, 20):
+                deferreds = [
+                    run_in_background(
+                        self.federation_client.get_pdu,
+                        destinations=[destination],
+                        event_id=e_id,
+                        room_version=room_version,
+                    )
+                    for e_id in batch
+                ]
 
-        # XXX 20 requests at once? really?
-        for batch in batch_iter(missing_events, 20):
-            deferreds = [
-                run_in_background(
-                    self.federation_client.get_pdu,
-                    destinations=[destination],
-                    event_id=e_id,
-                    room_version=room_version,
+                res = await make_deferred_yieldable(
+                    defer.DeferredList(deferreds, consumeErrors=True)
                 )
-                for e_id in batch
-            ]
 
-            res = await make_deferred_yieldable(
-                defer.DeferredList(deferreds, consumeErrors=True)
+                for success, result in res:
+                    if success and result:
+                        fetched_events[result.event_id] = result
+
+        # check for events which were in the wrong room.
+        #
+        # this can happen if a remote server claims that the state or
+        # auth_events at an event in room A are actually events in room B
+
+        bad_events = list(
+            (event_id, event.room_id)
+            for event_id, event in fetched_events.items()
+            if event.room_id != room_id
+        )
+
+        for bad_event_id, bad_room_id in bad_events:
+            # This is a bogus situation, but since we may only discover it a long time
+            # after it happened, we try our best to carry on, by just omitting the
+            # bad events from the returned auth/state set.
+            logger.warning(
+                "Remote server %s claims event %s in room %s is an auth/state "
+                "event in room %s",
+                destination,
+                bad_event_id,
+                bad_room_id,
+                room_id,
             )
-            for success, result in res:
-                if success and result:
-                    fetched_events[result.event_id] = result
+            del fetched_events[bad_event_id]
 
         return fetched_events
 
diff --git a/synapse/server.py b/synapse/server.py
index 2db3dab221..5021068ce0 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -34,6 +34,7 @@ from synapse.api.filtering import Filtering
 from synapse.api.ratelimiting import Ratelimiter
 from synapse.appservice.api import ApplicationServiceApi
 from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
@@ -97,6 +98,7 @@ from synapse.server_notices.worker_server_notices_sender import (
 )
 from synapse.state import StateHandler, StateResolutionHandler
 from synapse.storage import DataStores, Storage
+from synapse.storage.engines import create_engine
 from synapse.streams.events import EventSources
 from synapse.util import Clock
 from synapse.util.distributor import Distributor
@@ -209,16 +211,18 @@ class HomeServer(object):
     # instantiated during setup() for future return by get_datastore()
     DATASTORE_CLASS = abc.abstractproperty()
 
-    def __init__(self, hostname, reactor=None, **kwargs):
+    def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwargs):
         """
         Args:
             hostname : The hostname for the server.
+            config: The full config for the homeserver.
         """
         if not reactor:
             from twisted.internet import reactor
 
         self._reactor = reactor
         self.hostname = hostname
+        self.config = config
         self._building = {}
         self._listening_services = []
         self.start_time = None
@@ -229,6 +233,12 @@ class HomeServer(object):
         self.admin_redaction_ratelimiter = Ratelimiter()
         self.registration_ratelimiter = Ratelimiter()
 
+        self.database_engine = create_engine(config.database_config)
+        config.database_config.setdefault("args", {})[
+            "cp_openfun"
+        ] = self.database_engine.on_new_connection
+        self.db_config = config.database_config
+
         self.datastores = None
 
         # Other kwargs are explicit dependencies
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 38cd0ca9b8..e551606f9d 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -14,15 +14,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import Dict, List
+
 from six import iteritems
 
 from canonicaljson import encode_canonical_json, json
 
+from twisted.enterprise.adbapi import Connection
 from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, db_to_json
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
@@ -271,7 +274,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         Args:
             txn (twisted.enterprise.adbapi.Connection): db connection
             user_id (str): the user whose key is being requested
-            key_type (str): the type of key that is being set: either 'master'
+            key_type (str): the type of key that is being requested: either 'master'
                 for a master key, 'self_signing' for a self-signing key, or
                 'user_signing' for a user-signing key
             from_user_id (str): if specified, signatures made by this user on
@@ -316,8 +319,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         """Returns a user's cross-signing key.
 
         Args:
-            user_id (str): the user whose self-signing key is being requested
-            key_type (str): the type of cross-signing key to get
+            user_id (str): the user whose key is being requested
+            key_type (str): the type of key that is being requested: either 'master'
+                for a master key, 'self_signing' for a self-signing key, or
+                'user_signing' for a user-signing key
             from_user_id (str): if specified, signatures made by this user on
                 the self-signing key will be included in the result
 
@@ -332,6 +337,206 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             from_user_id,
         )
 
+    @cached(num_args=1)
+    def _get_bare_e2e_cross_signing_keys(self, user_id):
+        """Dummy function.  Only used to make a cache for
+        _get_bare_e2e_cross_signing_keys_bulk.
+        """
+        raise NotImplementedError()
+
+    @cachedList(
+        cached_method_name="_get_bare_e2e_cross_signing_keys",
+        list_name="user_ids",
+        num_args=1,
+    )
+    def _get_bare_e2e_cross_signing_keys_bulk(
+        self, user_ids: List[str]
+    ) -> Dict[str, Dict[str, dict]]:
+        """Returns the cross-signing keys for a set of users.  The output of this
+        function should be passed to _get_e2e_cross_signing_signatures_txn if
+        the signatures for the calling user need to be fetched.
+
+        Args:
+            user_ids (list[str]): the users whose keys are being requested
+
+        Returns:
+            dict[str, dict[str, dict]]: mapping from user ID to key type to key
+                data.  If a user's cross-signing keys were not found, either
+                their user ID will not be in the dict, or their user ID will map
+                to None.
+
+        """
+        return self.db.runInteraction(
+            "get_bare_e2e_cross_signing_keys_bulk",
+            self._get_bare_e2e_cross_signing_keys_bulk_txn,
+            user_ids,
+        )
+
+    def _get_bare_e2e_cross_signing_keys_bulk_txn(
+        self, txn: Connection, user_ids: List[str],
+    ) -> Dict[str, Dict[str, dict]]:
+        """Returns the cross-signing keys for a set of users.  The output of this
+        function should be passed to _get_e2e_cross_signing_signatures_txn if
+        the signatures for the calling user need to be fetched.
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            user_ids (list[str]): the users whose keys are being requested
+
+        Returns:
+            dict[str, dict[str, dict]]: mapping from user ID to key type to key
+                data.  If a user's cross-signing keys were not found, their user
+                ID will not be in the dict.
+
+        """
+        result = {}
+
+        batch_size = 100
+        chunks = [
+            user_ids[i : i + batch_size] for i in range(0, len(user_ids), batch_size)
+        ]
+        for user_chunk in chunks:
+            sql = """
+                SELECT k.user_id, k.keytype, k.keydata, k.stream_id
+                  FROM e2e_cross_signing_keys k
+                  INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id
+                                FROM e2e_cross_signing_keys
+                               GROUP BY user_id, keytype) s
+                 USING (user_id, stream_id, keytype)
+                 WHERE k.user_id IN (%s)
+            """ % (
+                ",".join("?" for u in user_chunk),
+            )
+            query_params = []
+            query_params.extend(user_chunk)
+
+            txn.execute(sql, query_params)
+            rows = self.db.cursor_to_dict(txn)
+
+            for row in rows:
+                user_id = row["user_id"]
+                key_type = row["keytype"]
+                key = json.loads(row["keydata"])
+                user_info = result.setdefault(user_id, {})
+                user_info[key_type] = key
+
+        return result
+
+    def _get_e2e_cross_signing_signatures_txn(
+        self, txn: Connection, keys: Dict[str, Dict[str, dict]], from_user_id: str,
+    ) -> Dict[str, Dict[str, dict]]:
+        """Returns the cross-signing signatures made by a user on a set of keys.
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            keys (dict[str, dict[str, dict]]): a map of user ID to key type to
+                key data.  This dict will be modified to add signatures.
+            from_user_id (str): fetch the signatures made by this user
+
+        Returns:
+            dict[str, dict[str, dict]]: mapping from user ID to key type to key
+                data.  The return value will be the same as the keys argument,
+                with the modifications included.
+        """
+
+        # find out what cross-signing keys (a.k.a. devices) we need to get
+        # signatures for.  This is a map of (user_id, device_id) to key type
+        # (device_id is the key's public part).
+        devices = {}
+
+        for user_id, user_info in keys.items():
+            if user_info is None:
+                continue
+            for key_type, key in user_info.items():
+                device_id = None
+                for k in key["keys"].values():
+                    device_id = k
+                devices[(user_id, device_id)] = key_type
+
+        device_list = list(devices)
+
+        # split into batches
+        batch_size = 100
+        chunks = [
+            device_list[i : i + batch_size]
+            for i in range(0, len(device_list), batch_size)
+        ]
+        for user_chunk in chunks:
+            sql = """
+                SELECT target_user_id, target_device_id, key_id, signature
+                  FROM e2e_cross_signing_signatures
+                 WHERE user_id = ?
+                   AND (%s)
+            """ % (
+                " OR ".join(
+                    "(target_user_id = ? AND target_device_id = ?)" for d in devices
+                )
+            )
+            query_params = [from_user_id]
+            for item in devices:
+                # item is a (user_id, device_id) tuple
+                query_params.extend(item)
+
+            txn.execute(sql, query_params)
+            rows = self.db.cursor_to_dict(txn)
+
+            # and add the signatures to the appropriate keys
+            for row in rows:
+                key_id = row["key_id"]
+                target_user_id = row["target_user_id"]
+                target_device_id = row["target_device_id"]
+                key_type = devices[(target_user_id, target_device_id)]
+                # We need to copy everything, because the result may have come
+                # from the cache.  dict.copy only does a shallow copy, so we
+                # need to recursively copy the dicts that will be modified.
+                user_info = keys[target_user_id] = keys[target_user_id].copy()
+                target_user_key = user_info[key_type] = user_info[key_type].copy()
+                if "signatures" in target_user_key:
+                    signatures = target_user_key["signatures"] = target_user_key[
+                        "signatures"
+                    ].copy()
+                    if from_user_id in signatures:
+                        user_sigs = signatures[from_user_id] = signatures[from_user_id]
+                        user_sigs[key_id] = row["signature"]
+                    else:
+                        signatures[from_user_id] = {key_id: row["signature"]}
+                else:
+                    target_user_key["signatures"] = {
+                        from_user_id: {key_id: row["signature"]}
+                    }
+
+        return keys
+
+    @defer.inlineCallbacks
+    def get_e2e_cross_signing_keys_bulk(
+        self, user_ids: List[str], from_user_id: str = None
+    ) -> defer.Deferred:
+        """Returns the cross-signing keys for a set of users.
+
+        Args:
+            user_ids (list[str]): the users whose keys are being requested
+            from_user_id (str): if specified, signatures made by this user on
+                the self-signing keys will be included in the result
+
+        Returns:
+            Deferred[dict[str, dict[str, dict]]]: map of user ID to key type to
+                key data.  If a user's cross-signing keys were not found, either
+                their user ID will not be in the dict, or their user ID will map
+                to None.
+        """
+
+        result = yield self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
+
+        if from_user_id:
+            result = yield self.db.runInteraction(
+                "get_e2e_cross_signing_signatures",
+                self._get_e2e_cross_signing_signatures_txn,
+                result,
+                from_user_id,
+            )
+
+        return result
+
     def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
         """Return a list of changes from the user signature stream to notify remotes.
         Note that the user signature stream represents when a user signs their
@@ -520,6 +725,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 },
             )
 
+        self._invalidate_cache_and_stream(
+            txn, self._get_bare_e2e_cross_signing_keys, (user_id,)
+        )
+
     def set_e2e_cross_signing_key(self, user_id, key_type, key):
         """Set a user's cross-signing key.
 
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index dfb46ee0f8..47ebb8a214 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -385,7 +385,7 @@ class SearchStore(SearchBackgroundUpdateStore):
         """
         clauses = []
 
-        search_query = search_query = _parse_query(self.database_engine, search_term)
+        search_query = _parse_query(self.database_engine, search_term)
 
         args = []
 
@@ -501,7 +501,7 @@ class SearchStore(SearchBackgroundUpdateStore):
         """
         clauses = []
 
-        search_query = search_query = _parse_query(self.database_engine, search_term)
+        search_query = _parse_query(self.database_engine, search_term)
 
         args = []
 
@@ -606,7 +606,12 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
-        events = yield self.get_events_as_list([r["event_id"] for r in results])
+        # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
+        # search results (which is a data leak)
+        events = yield self.get_events_as_list(
+            [r["event_id"] for r in results],
+            redact_behaviour=EventRedactBehaviour.BLOCK,
+        )
 
         event_map = {ev.event_id: ev for ev in events}
 
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 84f5ae22c3..2e8f6543e5 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -271,7 +271,7 @@ class _CacheDescriptorBase(object):
         else:
             self.function_to_call = orig
 
-        arg_spec = inspect.getargspec(orig)
+        arg_spec = inspect.getfullargspec(orig)
         all_args = arg_spec.args
 
         if "cache_context" in all_args: