summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/cache.py22
-rw-r--r--synapse/storage/data_stores/main/devices.py71
-rw-r--r--synapse/storage/data_stores/main/events.py185
-rw-r--r--synapse/storage/data_stores/main/events_worker.py2
-rw-r--r--synapse/storage/data_stores/main/registration.py2
-rw-r--r--synapse/storage/data_stores/main/room.py94
-rw-r--r--synapse/storage/data_stores/main/roommember.py37
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql19
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql25
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql24
-rw-r--r--synapse/storage/data_stores/main/state.py165
-rw-r--r--synapse/storage/data_stores/main/stats.py4
-rw-r--r--synapse/storage/engines/postgres.py42
-rw-r--r--synapse/storage/engines/sqlite.py5
-rw-r--r--synapse/storage/persist_events.py136
-rw-r--r--synapse/storage/prepare_database.py5
16 files changed, 726 insertions, 112 deletions
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index afa2b41c98..d4c44dcc75 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -16,7 +16,7 @@
 
 import itertools
 import logging
-from typing import Any, Iterable, Optional
+from typing import Any, Iterable, Optional, Tuple
 
 from twisted.internet import defer
 
@@ -33,6 +33,26 @@ CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
 
 
 class CacheInvalidationStore(SQLBaseStore):
+    async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
+        """Invalidates the cache and adds it to the cache stream so slaves
+        will know to invalidate their caches.
+
+        This should only be used to invalidate caches where slaves won't
+        otherwise know from other replication streams that the cache should
+        be invalidated.
+        """
+        cache_func = getattr(self, cache_name, None)
+        if not cache_func:
+            return
+
+        cache_func.invalidate(keys)
+        await self.runInteraction(
+            "invalidate_cache_and_stream",
+            self._send_invalidation_to_replication,
+            cache_func.__name__,
+            keys,
+        )
+
     def _invalidate_cache_and_stream(self, txn, cache_func, keys):
         """Invalidates the cache and adds it to the cache stream so slaves
         will know to invalidate their caches.
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index f0a7962dd0..b7617efb80 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -32,7 +32,7 @@ from synapse.logging.opentracing import (
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
-from synapse.types import get_verify_key_from_cross_signing_key
+from synapse.types import Collection, get_verify_key_from_cross_signing_key
 from synapse.util.caches.descriptors import (
     Cache,
     cached,
@@ -320,6 +320,11 @@ class DeviceWorkerStore(SQLBaseStore):
                     device_display_name = device.get("device_display_name", None)
                     if device_display_name:
                         result["device_display_name"] = device_display_name
+                    if "signatures" in device:
+                        for sig_user_id, sigs in device["signatures"].items():
+                            result["keys"].setdefault("signatures", {}).setdefault(
+                                sig_user_id, {}
+                            ).update(sigs)
                 else:
                     result["deleted"] = True
 
@@ -443,8 +448,15 @@ class DeviceWorkerStore(SQLBaseStore):
         """
         user_ids = set(user_id for user_id, _ in query_list)
         user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
-        user_ids_in_cache = set(
-            user_id for user_id, stream_id in user_map.items() if stream_id
+
+        # We go and check if any of the users need to have their device lists
+        # resynced. If they do then we remove them from the cached list.
+        users_needing_resync = yield self.get_user_ids_requiring_device_list_resync(
+            user_ids
+        )
+        user_ids_in_cache = (
+            set(user_id for user_id, stream_id in user_map.items() if stream_id)
+            - users_needing_resync
         )
         user_ids_not_in_cache = user_ids - user_ids_in_cache
 
@@ -457,7 +469,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 device = yield self._get_cached_user_device(user_id, device_id)
                 results.setdefault(user_id, {})[device_id] = device
             else:
-                results[user_id] = yield self._get_cached_devices_for_user(user_id)
+                results[user_id] = yield self.get_cached_devices_for_user(user_id)
 
         set_tag("in_cache", results)
         set_tag("not_in_cache", user_ids_not_in_cache)
@@ -475,12 +487,12 @@ class DeviceWorkerStore(SQLBaseStore):
         return db_to_json(content)
 
     @cachedInlineCallbacks()
-    def _get_cached_devices_for_user(self, user_id):
+    def get_cached_devices_for_user(self, user_id):
         devices = yield self.db.simple_select_list(
             table="device_lists_remote_cache",
             keyvalues={"user_id": user_id},
             retcols=("device_id", "content"),
-            desc="_get_cached_devices_for_user",
+            desc="get_cached_devices_for_user",
         )
         return {
             device["device_id"]: db_to_json(device["content"]) for device in devices
@@ -517,6 +529,11 @@ class DeviceWorkerStore(SQLBaseStore):
                 device_display_name = device.get("device_display_name", None)
                 if device_display_name:
                     result["device_display_name"] = device_display_name
+                if "signatures" in device:
+                    for sig_user_id, sigs in device["signatures"].items():
+                        result["keys"].setdefault("signatures", {}).setdefault(
+                            sig_user_id, {}
+                        ).update(sigs)
 
                 results.append(result)
 
@@ -641,6 +658,37 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return results
 
+    @defer.inlineCallbacks
+    def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]):
+        """Given a list of remote users return the list of users that we
+        should resync the device lists for.
+
+        Returns:
+            Deferred[Set[str]]
+        """
+
+        rows = yield self.db.simple_select_many_batch(
+            table="device_lists_remote_resync",
+            column="user_id",
+            iterable=user_ids,
+            retcols=("user_id",),
+            desc="get_user_ids_requiring_device_list_resync",
+        )
+
+        return {row["user_id"] for row in rows}
+
+    def mark_remote_user_device_cache_as_stale(self, user_id: str):
+        """Records that the server has reason to believe the cache of the devices
+        for the remote users is out of date.
+        """
+        return self.db.simple_upsert(
+            table="device_lists_remote_resync",
+            keyvalues={"user_id": user_id},
+            values={},
+            insertion_values={"added_ts": self._clock.time_msec()},
+            desc="make_remote_user_device_cache_as_stale",
+        )
+
 
 class DeviceBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
@@ -887,7 +935,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
-        txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
+        txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
         txn.call_after(
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
         )
@@ -942,7 +990,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             ],
         )
 
-        txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
+        txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
         txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
         txn.call_after(
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
@@ -958,6 +1006,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             lock=False,
         )
 
+        # If we're replacing the remote user's device list cache presumably
+        # we've done a full resync, so we remove the entry that says we need
+        # to resync
+        self.db.simple_delete_txn(
+            txn, table="device_lists_remote_resync", keyvalues={"user_id": user_id},
+        )
+
     @defer.inlineCallbacks
     def add_device_change_to_streams(self, user_id, device_ids, hosts):
         """Persist that a user's devices have been updated, and which hosts
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 596daf8909..c9d0d68c3a 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -32,6 +32,7 @@ from twisted.internet import defer
 import synapse.metrics
 from synapse.api.constants import EventContentFields, EventTypes
 from synapse.api.errors import SynapseError
+from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event_dict
@@ -468,84 +469,93 @@ class EventsStore(
             to_delete = delta_state.to_delete
             to_insert = delta_state.to_insert
 
-            # First we add entries to the current_state_delta_stream. We
-            # do this before updating the current_state_events table so
-            # that we can use it to calculate the `prev_event_id`. (This
-            # allows us to not have to pull out the existing state
-            # unnecessarily).
-            #
-            # The stream_id for the update is chosen to be the minimum of the stream_ids
-            # for the batch of the events that we are persisting; that means we do not
-            # end up in a situation where workers see events before the
-            # current_state_delta updates.
-            #
-            sql = """
-                INSERT INTO current_state_delta_stream
-                (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                SELECT ?, ?, ?, ?, ?, (
-                    SELECT event_id FROM current_state_events
-                    WHERE room_id = ? AND type = ? AND state_key = ?
+            if delta_state.no_longer_in_room:
+                # Server is no longer in the room so we delete the room from
+                # current_state_events, being careful we've already updated the
+                # rooms.room_version column (which gets populated in a
+                # background task).
+                self._upsert_room_version_txn(txn, room_id)
+
+                # Before deleting we populate the current_state_delta_stream
+                # so that async background tasks get told what happened.
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                        (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, room_id, type, state_key, null, event_id
+                        FROM current_state_events
+                        WHERE room_id = ?
+                """
+                txn.execute(sql, (stream_id, room_id))
+
+                self.db.simple_delete_txn(
+                    txn, table="current_state_events", keyvalues={"room_id": room_id},
                 )
-            """
-            txn.executemany(
-                sql,
-                (
-                    (
-                        stream_id,
-                        room_id,
-                        etype,
-                        state_key,
-                        None,
-                        room_id,
-                        etype,
-                        state_key,
+            else:
+                # We're still in the room, so we update the current state as normal.
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                #
+                # The stream_id for the update is chosen to be the minimum of the stream_ids
+                # for the batch of the events that we are persisting; that means we do not
+                # end up in a situation where workers see events before the
+                # current_state_delta updates.
+                #
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
                     )
-                    for etype, state_key in to_delete
-                    # We sanity check that we're deleting rather than updating
-                    if (etype, state_key) not in to_insert
-                ),
-            )
-            txn.executemany(
-                sql,
-                (
+                """
+                txn.executemany(
+                    sql,
                     (
-                        stream_id,
-                        room_id,
-                        etype,
-                        state_key,
-                        ev_id,
-                        room_id,
-                        etype,
-                        state_key,
-                    )
-                    for (etype, state_key), ev_id in iteritems(to_insert)
-                ),
-            )
+                        (
+                            stream_id,
+                            room_id,
+                            etype,
+                            state_key,
+                            to_insert.get((etype, state_key)),
+                            room_id,
+                            etype,
+                            state_key,
+                        )
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
+                )
+                # Now we actually update the current_state_events table
 
-            # Now we actually update the current_state_events table
+                txn.executemany(
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
+                )
 
-            txn.executemany(
-                "DELETE FROM current_state_events"
-                " WHERE room_id = ? AND type = ? AND state_key = ?",
-                (
-                    (room_id, etype, state_key)
-                    for etype, state_key in itertools.chain(to_delete, to_insert)
-                ),
-            )
+                # We include the membership in the current state table, hence we do
+                # a lookup when we insert. This assumes that all events have already
+                # been inserted into room_memberships.
+                txn.executemany(
+                    """INSERT INTO current_state_events
+                        (room_id, type, state_key, event_id, membership)
+                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                    """,
+                    [
+                        (room_id, key[0], key[1], ev_id, ev_id)
+                        for key, ev_id in iteritems(to_insert)
+                    ],
+                )
 
-            # We include the membership in the current state table, hence we do
-            # a lookup when we insert. This assumes that all events have already
-            # been inserted into room_memberships.
-            txn.executemany(
-                """INSERT INTO current_state_events
-                    (room_id, type, state_key, event_id, membership)
-                VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
-                """,
-                [
-                    (room_id, key[0], key[1], ev_id, ev_id)
-                    for key, ev_id in iteritems(to_insert)
-                ],
-            )
+            # We now update `local_current_membership`. We do this regardless
+            # of whether we're still in the room or not to handle the case where
+            # e.g. we just got banned (where we need to record that fact here).
 
             # Note: Do we really want to delete rows here (that we do not
             # subsequently reinsert below)? While technically correct it means
@@ -601,6 +611,35 @@ class EventsStore(
 
             self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
 
+    def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
+        """Update the room version in the database based off current state
+        events.
+
+        This is used when we're about to delete current state and we want to
+        ensure that the `rooms.room_version` column is up to date.
+        """
+
+        sql = """
+            SELECT json FROM event_json
+            INNER JOIN current_state_events USING (room_id, event_id)
+            WHERE room_id = ? AND type = ? AND state_key = ?
+        """
+        txn.execute(sql, (room_id, EventTypes.Create, ""))
+        row = txn.fetchone()
+        if row:
+            event_json = json.loads(row[0])
+            content = event_json.get("content", {})
+            creator = content.get("creator")
+            room_version_id = content.get("room_version", RoomVersions.V1.identifier)
+
+            self.db.simple_upsert_txn(
+                txn,
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                values={"room_version": room_version_id},
+                insertion_values={"is_public": False, "creator": creator},
+            )
+
     def _update_forward_extremities_txn(
         self, txn, new_forward_extremities, max_stream_order
     ):
@@ -951,7 +990,7 @@ class EventsStore(
             elif event.type == EventTypes.Message:
                 # Insert into the event_search table.
                 self._store_room_message_txn(txn, event)
-            elif event.type == EventTypes.Redaction:
+            elif event.type == EventTypes.Redaction and event.redacts is not None:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
             elif event.type == EventTypes.Retention:
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 3b93e0597a..7251e819f5 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -287,7 +287,7 @@ class EventsWorkerStore(SQLBaseStore):
             # we have to recheck auth now.
 
             if not allow_rejected and entry.event.type == EventTypes.Redaction:
-                if not hasattr(entry.event, "redacts"):
+                if entry.event.redacts is None:
                     # A redacted redaction doesn't have a `redacts` key, in
                     # which case lets just withhold the event.
                     #
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index cb4b2b39a0..49306642ed 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -291,7 +291,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             desc="is_server_admin",
         )
 
-        return res if res else False
+        return bool(res) if res else False
 
     def set_server_admin(self, user, admin):
         """Sets whether a user is an admin of this homeserver.
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index d968803ad2..9a17e336ba 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -29,9 +29,10 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
+from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.data_stores.main.search import SearchStore
-from synapse.storage.database import Database
+from synapse.storage.database import Database, LoggingTransaction
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
@@ -734,6 +735,7 @@ class RoomWorkerStore(SQLBaseStore):
 
 class RoomBackgroundUpdateStore(SQLBaseStore):
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
+    ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
 
     def __init__(self, database: Database, db_conn, hs):
         super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs)
@@ -749,6 +751,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
             self._remove_tombstoned_rooms_from_directory,
         )
 
+        self.db.updates.register_background_update_handler(
+            self.ADD_ROOMS_ROOM_VERSION_COLUMN,
+            self._background_add_rooms_room_version_column,
+        )
+
     @defer.inlineCallbacks
     def _background_insert_retention(self, progress, batch_size):
         """Retrieves a list of all rooms within a range and inserts an entry for each of
@@ -817,6 +824,73 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         defer.returnValue(batch_size)
 
+    async def _background_add_rooms_room_version_column(
+        self, progress: dict, batch_size: int
+    ):
+        """Background update to go and add room version inforamtion to `rooms`
+        table from `current_state_events` table.
+        """
+
+        last_room_id = progress.get("room_id", "")
+
+        def _background_add_rooms_room_version_column_txn(txn: LoggingTransaction):
+            sql = """
+                SELECT room_id, json FROM current_state_events
+                INNER JOIN event_json USING (room_id, event_id)
+                WHERE room_id > ? AND type = 'm.room.create' AND state_key = ''
+                ORDER BY room_id
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_room_id, batch_size))
+
+            updates = []
+            for room_id, event_json in txn:
+                event_dict = json.loads(event_json)
+                room_version_id = event_dict.get("content", {}).get(
+                    "room_version", RoomVersions.V1.identifier
+                )
+
+                creator = event_dict.get("content").get("creator")
+
+                updates.append((room_id, creator, room_version_id))
+
+            if not updates:
+                return True
+
+            new_last_room_id = ""
+            for room_id, creator, room_version_id in updates:
+                # We upsert here just in case we don't already have a row,
+                # mainly for paranoia as much badness would happen if we don't
+                # insert the row and then try and get the room version for the
+                # room.
+                self.db.simple_upsert_txn(
+                    txn,
+                    table="rooms",
+                    keyvalues={"room_id": room_id},
+                    values={"room_version": room_version_id},
+                    insertion_values={"is_public": False, "creator": creator},
+                )
+                new_last_room_id = room_id
+
+            self.db.updates._background_update_progress_txn(
+                txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
+            )
+
+            return False
+
+        end = await self.db.runInteraction(
+            "_background_add_rooms_room_version_column",
+            _background_add_rooms_room_version_column_txn,
+        )
+
+        if end:
+            await self.db.updates._end_background_update(
+                self.ADD_ROOMS_ROOM_VERSION_COLUMN
+            )
+
+        return batch_size
+
     async def _remove_tombstoned_rooms_from_directory(
         self, progress, batch_size
     ) -> int:
@@ -881,14 +955,21 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         self.config = hs.config
 
     @defer.inlineCallbacks
-    def store_room(self, room_id, room_creator_user_id, is_public):
+    def store_room(
+        self,
+        room_id: str,
+        room_creator_user_id: str,
+        is_public: bool,
+        room_version: RoomVersion,
+    ):
         """Stores a room.
 
         Args:
-            room_id (str): The desired room ID, can be None.
-            room_creator_user_id (str): The user ID of the room creator.
-            is_public (bool): True to indicate that this room should appear in
-            public room lists.
+            room_id: The desired room ID, can be None.
+            room_creator_user_id: The user ID of the room creator.
+            is_public: True to indicate that this room should appear in
+                public room lists.
+            room_version: The version of the room
         Raises:
             StoreError if the room could not be stored.
         """
@@ -902,6 +983,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                         "room_id": room_id,
                         "creator": room_creator_user_id,
                         "is_public": is_public,
+                        "room_version": room_version.identifier,
                     },
                 )
                 if is_public:
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 9acef7c950..042289f0e0 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import Iterable, List
+from typing import Iterable, List, Set
 
 from six import iteritems, itervalues
 
@@ -40,7 +40,7 @@ from synapse.storage.roommember import (
     ProfileInfo,
     RoomsForUser,
 )
-from synapse.types import get_domain_from_id
+from synapse.types import Collection, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
@@ -439,6 +439,39 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return results
 
+    async def get_users_server_still_shares_room_with(
+        self, user_ids: Collection[str]
+    ) -> Set[str]:
+        """Given a list of users return the set that the server still share a
+        room with.
+        """
+
+        if not user_ids:
+            return set()
+
+        def _get_users_server_still_shares_room_with_txn(txn):
+            sql = """
+                SELECT state_key FROM current_state_events
+                WHERE
+                    type = 'm.room.member'
+                    AND membership = 'join'
+                    AND %s
+                GROUP BY state_key
+            """
+
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "state_key", user_ids
+            )
+
+            txn.execute(sql % (clause,), args)
+
+            return set(row[0] for row in txn)
+
+        return await self.db.runInteraction(
+            "get_users_server_still_shares_room_with",
+            _get_users_server_still_shares_room_with_txn,
+        )
+
     @defer.inlineCallbacks
     def get_rooms_for_user(self, user_id, on_invalidate=None):
         """Returns a set of room_ids the user is currently joined to.
diff --git a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
new file mode 100644
index 0000000000..a133d87a19
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add background update to go and delete current state events for rooms the
+-- server is no longer in.
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('delete_old_current_state_events', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql b/synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql
new file mode 100644
index 0000000000..c3b6de2099
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql
@@ -0,0 +1,25 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records whether the server thinks that the remote users cached device lists
+-- may be out of date (e.g. if we have received a to device message from a
+-- device we don't know about).
+CREATE TABLE IF NOT EXISTS device_lists_remote_resync (
+    user_id TEXT NOT NULL,
+    added_ts BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX device_lists_remote_resync_idx ON device_lists_remote_resync (user_id);
+CREATE INDEX device_lists_remote_resync_ts_idx ON device_lists_remote_resync (added_ts);
diff --git a/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql
new file mode 100644
index 0000000000..352a66f5b0
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql
@@ -0,0 +1,24 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- We want to start storing the room version independently of
+-- `current_state_events` so that we can delete stale entries from it without
+-- losing the information.
+ALTER TABLE rooms ADD COLUMN room_version TEXT;
+
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('add_rooms_room_version_column', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 33bebd1c48..3d34103e67 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -21,12 +22,14 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
-from synapse.api.errors import NotFoundError
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
+from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
 from synapse.storage.database import Database
 from synapse.storage.state import StateFilter
 from synapse.util.caches import intern_string
@@ -60,24 +63,55 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
         super(StateGroupWorkerStore, self).__init__(database, db_conn, hs)
 
-    @defer.inlineCallbacks
-    def get_room_version(self, room_id):
+    async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
 
-        Args:
-            room_id (str)
+        Raises:
+            NotFoundError: if the room is unknown
 
-        Returns:
-            Deferred[str]
+            UnsupportedRoomVersionError: if the room uses an unknown room version.
+                Typically this happens if support for the room's version has been
+                removed from Synapse.
+        """
+        room_version_id = await self.get_room_version_id(room_id)
+        v = KNOWN_ROOM_VERSIONS.get(room_version_id)
+
+        if not v:
+            raise UnsupportedRoomVersionError(
+                "Room %s uses a room version %s which is no longer supported"
+                % (room_id, room_version_id)
+            )
+
+        return v
+
+    @cached(max_entries=10000)
+    async def get_room_version_id(self, room_id: str) -> str:
+        """Get the room_version of a given room
 
         Raises:
-            NotFoundError if the room is unknown
+            NotFoundError: if the room is unknown
         """
-        # for now we do this by looking at the create event. We may want to cache this
-        # more intelligently in future.
+
+        # First we try looking up room version from the database, but for old
+        # rooms we might not have added the room version to it yet so we fall
+        # back to previous behaviour and look in current state events.
+
+        # We really should have an entry in the rooms table for every room we
+        # care about, but let's be a bit paranoid (at least while the background
+        # update is happening) to avoid breaking existing rooms.
+        version = await self.db.simple_select_one_onecol(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcol="room_version",
+            desc="get_room_version",
+            allow_none=True,
+        )
+
+        if version is not None:
+            return version
 
         # Retrieve the room's create event
-        create_event = yield self.get_create_event_for_room(room_id)
+        create_event = await self.get_create_event_for_room(room_id)
         return create_event.content.get("room_version", "1")
 
     @defer.inlineCallbacks
@@ -290,14 +324,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         return set(row["state_group"] for row in rows)
 
 
-class MainStateBackgroundUpdateStore(SQLBaseStore):
+class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 
     CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
     EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
+    DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
 
     def __init__(self, database: Database, db_conn, hs):
         super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs)
 
+        self.server_name = hs.hostname
+
         self.db.updates.register_background_index_update(
             self.CURRENT_STATE_INDEX_UPDATE_NAME,
             index_name="current_state_events_member_index",
@@ -311,6 +348,108 @@ class MainStateBackgroundUpdateStore(SQLBaseStore):
             table="event_to_state_groups",
             columns=["state_group"],
         )
+        self.db.updates.register_background_update_handler(
+            self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms,
+        )
+
+    async def _background_remove_left_rooms(self, progress, batch_size):
+        """Background update to delete rows from `current_state_events` and
+        `event_forward_extremities` tables of rooms that the server is no
+        longer joined to.
+        """
+
+        last_room_id = progress.get("last_room_id", "")
+
+        def _background_remove_left_rooms_txn(txn):
+            sql = """
+                SELECT DISTINCT room_id FROM current_state_events
+                WHERE room_id > ? ORDER BY room_id LIMIT ?
+            """
+
+            txn.execute(sql, (last_room_id, batch_size))
+            room_ids = list(row[0] for row in txn)
+            if not room_ids:
+                return True, set()
+
+            sql = """
+                SELECT room_id
+                FROM current_state_events
+                WHERE
+                    room_id > ? AND room_id <= ?
+                    AND type = 'm.room.member'
+                    AND membership = 'join'
+                    AND state_key LIKE ?
+                GROUP BY room_id
+            """
+
+            txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))
+
+            joined_room_ids = set(row[0] for row in txn)
+
+            left_rooms = set(room_ids) - joined_room_ids
+
+            logger.info("Deleting current state left rooms: %r", left_rooms)
+
+            # First we get all users that we still think were joined to the
+            # room. This is so that we can mark those device lists as
+            # potentially stale, since there may have been a period where the
+            # server didn't share a room with the remote user and therefore may
+            # have missed any device updates.
+            rows = self.db.simple_select_many_txn(
+                txn,
+                table="current_state_events",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
+                retcols=("state_key",),
+            )
+
+            potentially_left_users = set(row["state_key"] for row in rows)
+
+            # Now lets actually delete the rooms from the DB.
+            self.db.simple_delete_many_txn(
+                txn,
+                table="current_state_events",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={},
+            )
+
+            self.db.simple_delete_many_txn(
+                txn,
+                table="event_forward_extremities",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={},
+            )
+
+            self.db.updates._background_update_progress_txn(
+                txn,
+                self.DELETE_CURRENT_STATE_UPDATE_NAME,
+                {"last_room_id": room_ids[-1]},
+            )
+
+            return False, potentially_left_users
+
+        finished, potentially_left_users = await self.db.runInteraction(
+            "_background_remove_left_rooms", _background_remove_left_rooms_txn
+        )
+
+        if finished:
+            await self.db.updates._end_background_update(
+                self.DELETE_CURRENT_STATE_UPDATE_NAME
+            )
+
+        # Now go and check if we still share a room with the remote users in
+        # the deleted rooms. If not mark their device lists as stale.
+        joined_users = await self.get_users_server_still_shares_room_with(
+            potentially_left_users
+        )
+
+        for user_id in potentially_left_users - joined_users:
+            await self.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+        return batch_size
 
 
 class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):
diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 7bc186e9a1..7af1495e47 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -744,7 +744,7 @@ class StatsStore(StateDeltasStore):
                     EventTypes.Create,
                     EventTypes.JoinRules,
                     EventTypes.RoomHistoryVisibility,
-                    EventTypes.Encryption,
+                    EventTypes.RoomEncryption,
                     EventTypes.Name,
                     EventTypes.Topic,
                     EventTypes.RoomAvatar,
@@ -816,7 +816,7 @@ class StatsStore(StateDeltasStore):
                 room_state["history_visibility"] = event.content.get(
                     "history_visibility"
                 )
-            elif event.type == EventTypes.Encryption:
+            elif event.type == EventTypes.RoomEncryption:
                 room_state["encryption"] = event.content.get("algorithm")
             elif event.type == EventTypes.Name:
                 room_state["name"] = event.content.get("name")
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index c84cb452b0..a077345960 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,8 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from ._base import IncorrectDatabaseSetup
 
+logger = logging.getLogger(__name__)
+
 
 class PostgresEngine(object):
     single_threaded = False
@@ -52,6 +56,44 @@ class PostgresEngine(object):
                     "See docs/postgres.rst for more information." % (rows[0][0],)
                 )
 
+            txn.execute(
+                "SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()"
+            )
+            collation, ctype = txn.fetchone()
+            if collation != "C":
+                logger.warning(
+                    "Database has incorrect collation of %r. Should be 'C'", collation
+                )
+
+            if ctype != "C":
+                logger.warning(
+                    "Database has incorrect ctype of %r. Should be 'C'", ctype
+                )
+
+    def check_new_database(self, txn):
+        """Gets called when setting up a brand new database. This allows us to
+        apply stricter checks on new databases versus existing database.
+        """
+
+        txn.execute(
+            "SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()"
+        )
+        collation, ctype = txn.fetchone()
+
+        errors = []
+
+        if collation != "C":
+            errors.append("    - 'COLLATE' is set to %r. Should be 'C'" % (collation,))
+
+        if ctype != "C":
+            errors.append("    - 'CTYPE' is set to %r. Should be 'C'" % (collation,))
+
+        if errors:
+            raise IncorrectDatabaseSetup(
+                "Database is incorrectly configured:\n\n%s\n\n"
+                "See docs/postgres.md for more information." % ("\n".join(errors))
+            )
+
     def convert_param_style(self, sql):
         return sql.replace("?", "%s")
 
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index cbf52f5191..641e490697 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -59,6 +59,11 @@ class Sqlite3Engine(object):
             if version < (3, 11, 0):
                 raise RuntimeError("Synapse requires sqlite 3.11 or above.")
 
+    def check_new_database(self, txn):
+        """Gets called when setting up a brand new database. This allows us to
+        apply stricter checks on new databases versus existing database.
+        """
+
     def convert_param_style(self, sql):
         return sql
 
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 368c457321..af3fd67ab9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -15,9 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 from collections import deque, namedtuple
-from typing import Iterable, List, Optional, Tuple
+from typing import Iterable, List, Optional, Set, Tuple
 
 from six import iteritems
 from six.moves import range
@@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
 )
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
 
     Attributes:
         to_delete: List of type/state_keys to delete from current state
         to_insert: Map of state to upsert into current state
+        no_longer_in_room: The server is not longer in the room, so the room
+            should e.g. be removed from `current_state_events` table.
     """
 
     to_delete = attr.ib(type=List[Tuple[str, str]])
     to_insert = attr.ib(type=StateMap[str])
+    no_longer_in_room = attr.ib(type=bool, default=False)
 
 
 class _EventPeristenceQueue(object):
@@ -314,6 +318,11 @@ class EventsPersistenceStorage(object):
             # room
             state_delta_for_room = {}
 
+            # Set of remote users which were in rooms the server has left. We
+            # should check if we still share any rooms and if not we mark their
+            # device lists as stale.
+            potentially_left_users = set()  # type: Set[str]
+
             if not backfilled:
                 with Measure(self._clock, "_calculate_state_and_extrem"):
                     # Work out the new "current state" for each room.
@@ -396,11 +405,12 @@ class EventsPersistenceStorage(object):
                         # If either are not None then there has been a change,
                         # and we need to work out the delta (or use that
                         # given)
+                        delta = None
                         if delta_ids is not None:
                             # If there is a delta we know that we've
                             # only added or replaced state, never
                             # removed keys entirely.
-                            state_delta_for_room[room_id] = DeltaState([], delta_ids)
+                            delta = DeltaState([], delta_ids)
                         elif current_state is not None:
                             with Measure(
                                 self._clock, "persist_events.calculate_state_delta"
@@ -408,6 +418,26 @@ class EventsPersistenceStorage(object):
                                 delta = await self._calculate_state_delta(
                                     room_id, current_state
                                 )
+
+                        if delta:
+                            # If we have a change of state then lets check
+                            # whether we're actually still a member of the room,
+                            # or if our last user left. If we're no longer in
+                            # the room then we delete the current state and
+                            # extremities.
+                            is_still_joined = await self._is_server_still_joined(
+                                room_id,
+                                ev_ctx_rm,
+                                delta,
+                                current_state,
+                                potentially_left_users,
+                            )
+                            if not is_still_joined:
+                                logger.info("Server no longer in room %s", room_id)
+                                latest_event_ids = []
+                                current_state = {}
+                                delta.no_longer_in_room = True
+
                             state_delta_for_room[room_id] = delta
 
                         # If we have the current_state then lets prefill
@@ -423,6 +453,8 @@ class EventsPersistenceStorage(object):
                 backfilled=backfilled,
             )
 
+            await self._handle_potentially_left_users(potentially_left_users)
+
     async def _calculate_new_extremities(
         self,
         room_id: str,
@@ -629,7 +661,7 @@ class EventsPersistenceStorage(object):
                 break
 
         if not room_version:
-            room_version = await self.main_store.get_room_version(room_id)
+            room_version = await self.main_store.get_room_version_id(room_id)
 
         logger.debug("calling resolve_state_groups from preserve_events")
         res = await self._state_resolution_handler.resolve_state_groups(
@@ -660,3 +692,97 @@ class EventsPersistenceStorage(object):
         }
 
         return DeltaState(to_delete=to_delete, to_insert=to_insert)
+
+    async def _is_server_still_joined(
+        self,
+        room_id: str,
+        ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+        delta: DeltaState,
+        current_state: Optional[StateMap[str]],
+        potentially_left_users: Set[str],
+    ) -> bool:
+        """Check if the server will still be joined after the given events have
+        been persised.
+
+        Args:
+            room_id
+            ev_ctx_rm
+            delta: The delta of current state between what is in the database
+                and what the new current state will be.
+            current_state: The new current state if it already been calculated,
+                otherwise None.
+            potentially_left_users: If the server has left the room, then joined
+                remote users will be added to this set to indicate that the
+                server may no longer be sharing a room with them.
+        """
+
+        if not any(
+            self.is_mine_id(state_key)
+            for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
+            if typ == EventTypes.Member
+        ):
+            # There have been no changes to membership of our users, so nothing
+            # has changed and we assume we're still in the room.
+            return True
+
+        # Check if any of the given events are a local join that appear in the
+        # current state
+        for (typ, state_key), event_id in delta.to_insert.items():
+            if typ != EventTypes.Member or not self.is_mine_id(state_key):
+                continue
+
+            for event, _ in ev_ctx_rm:
+                if event_id == event.event_id:
+                    if event.membership == Membership.JOIN:
+                        return True
+
+        # There's been a change of membership but we don't have a local join
+        # event in the new events, so we need to check the full state.
+        if current_state is None:
+            current_state = await self.main_store.get_current_state_ids(room_id)
+            current_state = dict(current_state)
+            for key in delta.to_delete:
+                current_state.pop(key, None)
+
+            current_state.update(delta.to_insert)
+
+        event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and self.is_mine_id(state_key)
+        ]
+
+        rows = await self.main_store.get_membership_from_event_ids(event_ids)
+        is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
+        if is_still_joined:
+            return True
+
+        # The server will leave the room, so we go and find out which remote
+        # users will still be joined when we leave.
+        remote_event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and not self.is_mine_id(state_key)
+        ]
+        rows = await self.main_store.get_membership_from_event_ids(remote_event_ids)
+        potentially_left_users.update(
+            row["user_id"] for row in rows if row["membership"] == Membership.JOIN
+        )
+
+        return False
+
+    async def _handle_potentially_left_users(self, user_ids: Set[str]):
+        """Given a set of remote users check if the server still shares a room with
+        them. If not then mark those users' device cache as stale.
+        """
+
+        if not user_ids:
+            return
+
+        joined_users = await self.main_store.get_users_server_still_shares_room_with(
+            user_ids
+        )
+        left_users = user_ids - joined_users
+
+        for user_id in left_users:
+            await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index e86984cd50..c285ef52a0 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -136,6 +136,11 @@ def _setup_new_database(cur, database_engine, data_stores):
         data_stores (list[str]): The names of the data stores to instantiate
             on the given database.
     """
+
+    # We're about to set up a brand new database so we check that its
+    # configured to our liking.
+    database_engine.check_new_database(cur)
+
     current_dir = os.path.join(dir_path, "schema", "full_schemas")
     directory_entries = os.listdir(current_dir)