summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py61
-rw-r--r--synapse/storage/_base.py51
-rw-r--r--synapse/storage/account_data.py31
-rw-r--r--synapse/storage/appservice.py6
-rw-r--r--synapse/storage/background_updates.py22
-rw-r--r--synapse/storage/client_ips.py28
-rw-r--r--synapse/storage/deviceinbox.py16
-rw-r--r--synapse/storage/devices.py10
-rw-r--r--synapse/storage/directory.py9
-rw-r--r--synapse/storage/end_to_end_keys.py9
-rw-r--r--synapse/storage/engines/__init__.py7
-rw-r--r--synapse/storage/engines/sqlite3.py4
-rw-r--r--synapse/storage/event_federation.py16
-rw-r--r--synapse/storage/event_push_actions.py21
-rw-r--r--synapse/storage/events.py57
-rw-r--r--synapse/storage/events_worker.py96
-rw-r--r--synapse/storage/filtering.py8
-rw-r--r--synapse/storage/group_server.py5
-rw-r--r--synapse/storage/keys.py14
-rw-r--r--synapse/storage/prepare_database.py1
-rw-r--r--synapse/storage/presence.py10
-rw-r--r--synapse/storage/profile.py2
-rw-r--r--synapse/storage/push_rule.py17
-rw-r--r--synapse/storage/pusher.py12
-rw-r--r--synapse/storage/receipts.py40
-rw-r--r--synapse/storage/registration.py19
-rw-r--r--synapse/storage/rejections.py4
-rw-r--r--synapse/storage/room.py11
-rw-r--r--synapse/storage/roommember.py21
-rw-r--r--synapse/storage/schema/delta/25/fts.py6
-rw-r--r--synapse/storage/schema/delta/27/ts.py4
-rw-r--r--synapse/storage/schema/delta/30/as_users.py2
-rw-r--r--synapse/storage/schema/delta/31/search_update.py7
-rw-r--r--synapse/storage/schema/delta/33/event_fields.py5
-rw-r--r--synapse/storage/schema/delta/33/remote_media_ts.py1
-rw-r--r--synapse/storage/schema/delta/34/cache_stream.py6
-rw-r--r--synapse/storage/schema/delta/34/received_txn_purge.py4
-rw-r--r--synapse/storage/schema/delta/34/sent_txn_purge.py4
-rw-r--r--synapse/storage/schema/delta/37/remove_auth_idx.py6
-rw-r--r--synapse/storage/schema/delta/42/user_dir.py2
-rw-r--r--synapse/storage/schema/delta/50/erasure_store.sql21
-rw-r--r--synapse/storage/search.py8
-rw-r--r--synapse/storage/signatures.py8
-rw-r--r--synapse/storage/state.py67
-rw-r--r--synapse/storage/stream.py16
-rw-r--r--synapse/storage/tags.py13
-rw-r--r--synapse/storage/transactions.py14
-rw-r--r--synapse/storage/user_directory.py18
-rw-r--r--synapse/storage/user_erasure_store.py103
-rw-r--r--synapse/storage/util/id_generators.py2
50 files changed, 550 insertions, 375 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 979fa22438..ba88a54979 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,50 +15,48 @@
 # limitations under the License.
 
 import datetime
-from dateutil import tz
-import time
 import logging
+import time
 
+from dateutil import tz
+
+from synapse.api.constants import PresenceState
 from synapse.storage.devices import DeviceStore
-from .appservice import (
-    ApplicationServiceStore, ApplicationServiceTransactionStore
-)
+from synapse.storage.user_erasure_store import UserErasureStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from .account_data import AccountDataStore
+from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
+from .client_ips import ClientIpStore
+from .deviceinbox import DeviceInboxStore
 from .directory import DirectoryStore
+from .end_to_end_keys import EndToEndKeyStore
+from .engines import PostgresEngine
+from .event_federation import EventFederationStore
+from .event_push_actions import EventPushActionsStore
 from .events import EventsStore
+from .filtering import FilteringStore
+from .group_server import GroupServerStore
+from .keys import KeyStore
+from .media_repository import MediaRepositoryStore
+from .openid import OpenIdStore
 from .presence import PresenceStore, UserPresenceState
 from .profile import ProfileStore
+from .push_rule import PushRuleStore
+from .pusher import PusherStore
+from .receipts import ReceiptsStore
 from .registration import RegistrationStore
+from .rejections import RejectionsStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
-from .stream import StreamStore
-from .transactions import TransactionStore
-from .keys import KeyStore
-from .event_federation import EventFederationStore
-from .pusher import PusherStore
-from .push_rule import PushRuleStore
-from .media_repository import MediaRepositoryStore
-from .rejections import RejectionsStore
-from .event_push_actions import EventPushActionsStore
-from .deviceinbox import DeviceInboxStore
-from .group_server import GroupServerStore
-from .state import StateStore
-from .signatures import SignatureStore
-from .filtering import FilteringStore
-from .end_to_end_keys import EndToEndKeyStore
-
-from .receipts import ReceiptsStore
 from .search import SearchStore
+from .signatures import SignatureStore
+from .state import StateStore
+from .stream import StreamStore
 from .tags import TagsStore
-from .account_data import AccountDataStore
-from .openid import OpenIdStore
-from .client_ips import ClientIpStore
+from .transactions import TransactionStore
 from .user_directory import UserDirectoryStore
-
-from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
-from .engines import PostgresEngine
-
-from synapse.api.constants import PresenceState
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerator
 
 logger = logging.getLogger(__name__)
 
@@ -88,6 +86,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 DeviceInboxStore,
                 UserDirectoryStore,
                 GroupServerStore,
+                UserErasureStore,
                 ):
 
     def __init__(self, db_conn, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 22d6257a9f..1d41d8d445 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -13,22 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import sys
+import threading
+import time
 
-from synapse.api.errors import StoreError
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.util.caches.descriptors import Cache
-from synapse.storage.engines import PostgresEngine
+from six import iteritems, iterkeys, itervalues
+from six.moves import intern, range
 
 from prometheus_client import Histogram
 
 from twisted.internet import defer
 
-import sys
-import time
-import threading
-
-from six import itervalues, iterkeys, iteritems
-from six.moves import intern, range
+from synapse.api.errors import StoreError
+from synapse.storage.engines import PostgresEngine
+from synapse.util.caches.descriptors import Cache
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 
 logger = logging.getLogger(__name__)
 
@@ -221,7 +220,7 @@ class SQLBaseStore(object):
         self._clock.looping_call(loop, 10000)
 
     def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
-                         logging_context, func, *args, **kwargs):
+                         func, *args, **kwargs):
         start = time.time()
         txn_id = self._TXN_ID
 
@@ -285,8 +284,7 @@ class SQLBaseStore(object):
             end = time.time()
             duration = end - start
 
-            if logging_context is not None:
-                logging_context.add_database_transaction(duration)
+            LoggingContext.current_context().add_database_transaction(duration)
 
             transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
 
@@ -310,19 +308,15 @@ class SQLBaseStore(object):
         Returns:
             Deferred: The result of func
         """
-        current_context = LoggingContext.current_context()
-
         after_callbacks = []
         exception_callbacks = []
 
-        def inner_func(conn, *args, **kwargs):
-            return self._new_transaction(
-                conn, desc, after_callbacks, exception_callbacks, current_context,
-                func, *args, **kwargs
-            )
-
         try:
-            result = yield self.runWithConnection(inner_func, *args, **kwargs)
+            result = yield self.runWithConnection(
+                self._new_transaction,
+                desc, after_callbacks, exception_callbacks, func,
+                *args, **kwargs
+            )
 
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
@@ -347,22 +341,25 @@ class SQLBaseStore(object):
         Returns:
             Deferred: The result of func
         """
-        current_context = LoggingContext.current_context()
+        parent_context = LoggingContext.current_context()
+        if parent_context == LoggingContext.sentinel:
+            logger.warn(
+                "Starting db connection from sentinel context: metrics will be lost",
+            )
+            parent_context = None
 
         start_time = time.time()
 
         def inner_func(conn, *args, **kwargs):
-            with LoggingContext("runWithConnection") as context:
+            with LoggingContext("runWithConnection", parent_context) as context:
                 sched_duration_sec = time.time() - start_time
                 sql_scheduling_timer.observe(sched_duration_sec)
-                current_context.add_database_scheduled(sched_duration_sec)
+                context.add_database_scheduled(sched_duration_sec)
 
                 if self.database_engine.is_connection_closed(conn):
                     logger.debug("Reconnecting closed database connection")
                     conn.reconnect()
 
-                current_context.copy_to(context)
-
                 return func(conn, *args, **kwargs)
 
         with PreserveLoggingContext():
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index f83ff0454a..bbc3355c73 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -14,17 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
+import logging
+
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.util.id_generators import StreamIdGenerator
-
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
-
-import abc
-import simplejson as json
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -114,25 +114,6 @@ class AccountDataWorkerStore(SQLBaseStore):
         else:
             defer.returnValue(None)
 
-    @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
-                num_args=2, list_name="user_ids", inlineCallbacks=True)
-    def get_global_account_data_by_type_for_users(self, data_type, user_ids):
-        rows = yield self._simple_select_many_batch(
-            table="account_data",
-            column="user_id",
-            iterable=user_ids,
-            keyvalues={
-                "account_data_type": data_type,
-            },
-            retcols=("user_id", "content",),
-            desc="get_global_account_data_by_type_for_users",
-        )
-
-        defer.returnValue({
-            row["user_id"]: json.loads(row["content"]) if row["content"] else None
-            for row in rows
-        })
-
     @cached(num_args=2)
     def get_account_data_for_room(self, user_id, room_id):
         """Get all the client account_data for a user for a room.
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..9f12b360bc 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,14 +15,16 @@
 # limitations under the License.
 import logging
 import re
-import simplejson as json
+
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
 from synapse.storage.events import EventsWorkerStore
-from ._base import SQLBaseStore
 
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 8af325a9f5..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,15 +12,17 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.async
 
-from ._base import SQLBaseStore
-from . import engines
+import logging
+
+from canonicaljson import json
 
 from twisted.internet import defer
 
-import simplejson as json
-import logging
+from synapse.metrics.background_process_metrics import run_as_background_process
+
+from . import engines
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
@@ -87,12 +89,16 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_handlers = {}
         self._all_done = False
 
-    @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        logger.info("Starting background schema updates")
+        run_as_background_process(
+            "background_updates", self._run_background_updates,
+        )
 
+    @defer.inlineCallbacks
+    def _run_background_updates(self):
+        logger.info("Starting background schema updates")
         while True:
-            yield synapse.util.async.sleep(
+            yield self.hs.get_clock().sleep(
                 self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
 
             try:
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ce338514e8..77ae10da3d 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,15 +15,15 @@
 
 import logging
 
-from twisted.internet import defer, reactor
+from six import iteritems
 
-from ._base import Cache
-from . import background_updates
+from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import CACHE_SIZE_FACTOR
 
-from six import iteritems
-
+from . import background_updates
+from ._base import Cache
 
 logger = logging.getLogger(__name__)
 
@@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self._client_ip_looper = self._clock.looping_call(
             self._update_client_ips_batch, 5 * 1000
         )
-        reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+        self.hs.get_reactor().addSystemEventTrigger(
+            "before", "shutdown", self._update_client_ips_batch
+        )
 
     def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
                          now=None):
@@ -92,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self._batch_row_update[key] = (user_agent, device_id, now)
 
     def _update_client_ips_batch(self):
-        to_update = self._batch_row_update
-        self._batch_row_update = {}
-        return self.runInteraction(
-            "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+        def update():
+            to_update = self._batch_row_update
+            self._batch_row_update = {}
+            return self.runInteraction(
+                "_update_client_ips_batch", self._update_client_ips_batch_txn,
+                to_update,
+            )
+
+        run_as_background_process(
+            "update_client_ips", update,
         )
 
     def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index a879e5bfc1..73646da025 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -14,14 +14,14 @@
 # limitations under the License.
 
 import logging
-import simplejson
 
-from twisted.internet import defer
+from canonicaljson import json
 
-from .background_updates import BackgroundUpdateStore
+from twisted.internet import defer
 
 from synapse.util.caches.expiringcache import ExpiringCache
 
+from .background_updates import BackgroundUpdateStore
 
 logger = logging.getLogger(__name__)
 
@@ -85,7 +85,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             )
             rows = []
             for destination, edu in remote_messages_by_destination.items():
-                edu_json = simplejson.dumps(edu)
+                edu_json = json.dumps(edu)
                 rows.append((destination, stream_id, now_ms, edu_json))
             txn.executemany(sql, rows)
 
@@ -177,7 +177,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     " WHERE user_id = ?"
                 )
                 txn.execute(sql, (user_id,))
-                message_json = simplejson.dumps(messages_by_device["*"])
+                message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
@@ -199,7 +199,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
-                    message_json = simplejson.dumps(messages_by_device[device])
+                    message_json = json.dumps(messages_by_device[device])
                     messages_json_for_user[device] = message_json
 
             if messages_json_for_user:
@@ -253,7 +253,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(simplejson.loads(row[1]))
+                messages.append(json.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
@@ -389,7 +389,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(simplejson.loads(row[1]))
+                messages.append(json.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d149d8392e..ec68e39f1e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,15 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson as json
+
+from six import iteritems, itervalues
+
+from canonicaljson import json
 
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
-from ._base import SQLBaseStore, Cache
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
-from six import itervalues, iteritems
+from ._base import Cache, SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index d0c0059757..808194236a 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -13,15 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached
-
-from synapse.api.errors import SynapseError
+from collections import namedtuple
 
 from twisted.internet import defer
 
-from collections import namedtuple
+from synapse.api.errors import SynapseError
+from synapse.util.caches.descriptors import cached
 
+from ._base import SQLBaseStore
 
 RoomAliasMapping = namedtuple(
     "RoomAliasMapping",
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b146487943..7ae5c65482 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,17 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from six import iteritems
+
+from canonicaljson import encode_canonical_json, json
+
 from twisted.internet import defer
 
 from synapse.util.caches.descriptors import cached
 
-from canonicaljson import encode_canonical_json
-import simplejson as json
-
 from ._base import SQLBaseStore
 
-from six import iteritems
-
 
 class EndToEndKeyStore(SQLBaseStore):
     def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 8c868ece75..e2f9de8451 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -13,13 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import IncorrectDatabaseSetup
-from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
-
 import importlib
 import platform
 
+from ._base import IncorrectDatabaseSetup
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
 
 SUPPORTED_MODULE = {
     "sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 60f0fa7fb3..19949fc474 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import prepare_database
-
 import struct
 import threading
 
+from synapse.storage.prepare_database import prepare_database
+
 
 class Sqlite3Engine(object):
     single_threaded = True
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8fbf7ffba7..8d366d1b91 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,23 +12,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
 import random
 
+from six.moves import range
+from six.moves.queue import Empty, PriorityQueue
+
+from unpaddedbase64 import encode_base64
+
 from twisted.internet import defer
 
+from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
-
-from synapse.api.errors import StoreError
 from synapse.util.caches.descriptors import cached
-from unpaddedbase64 import encode_base64
-
-import logging
-from six.moves.queue import PriorityQueue, Empty
-
-from six.moves import range
-
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..29b511ae5e 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,16 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage._base import SQLBaseStore, LoggingTransaction
-from twisted.internet import defer
-from synapse.util.async import sleep
-from synapse.util.caches.descriptors import cachedInlineCallbacks
-
 import logging
-import simplejson as json
 
 from six import iteritems
 
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
 logger = logging.getLogger(__name__)
 
 
@@ -84,6 +85,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         self.find_stream_orderings_looping_call = self._clock.looping_call(
             self._find_stream_orderings_for_times, 10 * 60 * 1000
         )
+        self._rotate_delay = 3
+        self._rotate_count = 10000
 
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
@@ -800,7 +803,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
                 )
                 if caught_up:
                     break
-                yield sleep(5)
+                yield self.hs.get_clock().sleep(self._rotate_delay)
         finally:
             self._doing_notif_rotation = False
 
@@ -821,8 +824,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         txn.execute("""
             SELECT stream_ordering FROM event_push_actions
             WHERE stream_ordering > ?
-            ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
-        """, (old_rotate_stream_ordering,))
+            ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
+        """, (old_rotate_stream_ordering, self._rotate_count))
         stream_row = txn.fetchone()
         if stream_row:
             offset_stream_ordering, = stream_row
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index cb1082e864..4ff0fdc4ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,36 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from collections import OrderedDict, deque, namedtuple
-from functools import wraps
 import itertools
 import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+
+from six import iteritems, itervalues
+from six.moves import range
+
+from canonicaljson import json
+from prometheus_client import Counter
 
-import simplejson as json
 from twisted.internet import defer
 
+import synapse.metrics
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase  # noqa: F401
+from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable,
-)
+from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id, RoomStreamToken
-import synapse.metrics
-
-# these are only included to make the type annotations work
-from synapse.events import EventBase    # noqa: F401
-from synapse.events.snapshot import EventContext   # noqa: F401
-
-from six.moves import range
-from six import itervalues, iteritems
-
-from prometheus_client import Counter
 
 logger = logging.getLogger(__name__)
 
@@ -158,11 +156,8 @@ class _EventPeristenceQueue(object):
                     self._event_persist_queues[room_id] = queue
                 self._currently_persisting_rooms.discard(room_id)
 
-        # set handle_queue_loop off on the background. We don't want to
-        # attribute work done in it to the current request, so we drop the
-        # logcontext altogether.
-        with PreserveLoggingContext():
-            handle_queue_loop()
+        # set handle_queue_loop off in the background
+        run_as_background_process("persist_events", handle_queue_loop)
 
     def _get_drainining_queue(self, room_id):
         queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -800,7 +795,8 @@ class EventsStore(EventsWorkerStore):
                     ]
                 )
 
-                self._curr_state_delta_stream_cache.entity_has_changed(
+                txn.call_after(
+                    self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
                 )
 
@@ -1044,7 +1040,6 @@ class EventsStore(EventsWorkerStore):
                 "event_edge_hashes",
                 "event_edges",
                 "event_forward_extremities",
-                "event_push_actions",
                 "event_reference_hashes",
                 "event_search",
                 "event_signatures",
@@ -1064,6 +1059,14 @@ class EventsStore(EventsWorkerStore):
                 [(ev.event_id,) for ev, _ in events_and_contexts]
             )
 
+        for table in (
+            "event_push_actions",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
     def _store_event_txn(self, txn, events_and_contexts):
         """Insert new events into the event and event_json tables
 
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,27 +12,29 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ._base import SQLBaseStore
+import logging
+from collections import namedtuple
 
-from twisted.internet import defer, reactor
+from canonicaljson import json
 
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase  # noqa: F401
 from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
-
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+    LoggingContext,
+    PreserveLoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
 )
 from synapse.util.metrics import Measure
-from synapse.api.errors import SynapseError
-
-from collections import namedtuple
-
-import logging
-import simplejson as json
 
-# these are only included to make the type annotations work
-from synapse.events import EventBase    # noqa: F401
-from synapse.events.snapshot import EventContext   # noqa: F401
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
@@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
         missing_events_ids = [e for e in event_ids if e not in event_entry_map]
 
         if missing_events_ids:
+            log_ctx = LoggingContext.current_context()
+            log_ctx.record_event_fetch(len(missing_events_ids))
+
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
                 check_redacted=check_redacted,
@@ -218,32 +223,47 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        event_list = []
         i = 0
         while True:
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        single_threaded = self.database_engine.single_threaded
-                        if single_threaded or i > EVENT_QUEUE_ITERATIONS:
-                            self._event_fetch_ongoing -= 1
-                            return
-                        else:
-                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                            i += 1
-                            continue
-                    i = 0
+            with self._event_fetch_lock:
+                event_list = self._event_fetch_list
+                self._event_fetch_list = []
+
+                if not event_list:
+                    single_threaded = self.database_engine.single_threaded
+                    if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+                        self._event_fetch_ongoing -= 1
+                        return
+                    else:
+                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                        i += 1
+                        continue
+                i = 0
+
+            self._fetch_event_list(conn, event_list)
+
+    def _fetch_event_list(self, conn, event_list):
+        """Handle a load of requests from the _event_fetch_list queue
+
+        Args:
+            conn (twisted.enterprise.adbapi.Connection): database connection
 
+            event_list (list[Tuple[list[str], Deferred]]):
+                The fetch requests. Each entry consists of a list of event
+                ids to be fetched, and a deferred to be completed once the
+                events have been fetched.
+
+        """
+        with Measure(self._clock, "_fetch_event_list"):
+            try:
                 event_id_lists = zip(*event_list)[0]
                 event_ids = [
                     item for sublist in event_id_lists for item in sublist
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], [],
+                    self._fetch_event_rows, event_ids,
                 )
 
                 row_dict = {
@@ -265,7 +285,7 @@ class EventsWorkerStore(SQLBaseStore):
                             except Exception:
                                 logger.exception("Failed to callback")
                 with PreserveLoggingContext():
-                    reactor.callFromThread(fire, event_list, row_dict)
+                    self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
             except Exception as e:
                 logger.exception("do_fetch")
 
@@ -276,9 +296,8 @@ class EventsWorkerStore(SQLBaseStore):
                             with PreserveLoggingContext():
                                 d.errback(e)
 
-                if event_list:
-                    with PreserveLoggingContext():
-                        reactor.callFromThread(fire, event_list)
+                with PreserveLoggingContext():
+                    self.hs.get_reactor().callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
@@ -304,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
                 should_start = False
 
         if should_start:
-            with PreserveLoggingContext():
-                self.runWithConnection(
-                    self._do_fetch
-                )
+            run_as_background_process(
+                "fetch_events",
+                self.runWithConnection,
+                self._do_fetch,
+            )
 
         logger.debug("Loading %d events", len(events))
         with PreserveLoggingContext():
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 2e2763126d..2d5896c5b4 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from canonicaljson import encode_canonical_json, json
+
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
-from synapse.api.errors import SynapseError, Codes
+from synapse.api.errors import Codes, SynapseError
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from ._base import SQLBaseStore
 
 
 class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index da05ccb027..592d1b4c2a 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -14,15 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
 
 from ._base import SQLBaseStore
 
-import simplejson as json
-
-
 # The category ID for the "default" category. We don't store as null in the
 # database to avoid the fun of null != null
 _DEFAULT_CATEGORY_ID = ""
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 0f13b61da8..f547977600 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -13,17 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+import hashlib
+import logging
 
-from twisted.internet import defer
 import six
 
-import OpenSSL
 from signedjson.key import decode_verify_key_bytes
-import hashlib
 
-import logging
+import OpenSSL
+from twisted.internet import defer
+
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index cf2aae0468..b290f834b3 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -20,7 +20,6 @@ import logging
 import os
 import re
 
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index f05d91cc58..a0c7a0dc87 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -13,13 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from collections import namedtuple
+
+from twisted.internet import defer
+
 from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 from synapse.util import batch_iter
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
-from collections import namedtuple
-from twisted.internet import defer
+from ._base import SQLBaseStore
 
 
 class UserPresenceState(namedtuple("UserPresenceState",
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 8612bd5ecc..60295da254 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -15,8 +15,8 @@
 
 from twisted.internet import defer
 
-from synapse.storage.roommember import ProfileInfo
 from synapse.api.errors import StoreError
+from synapse.storage.roommember import ProfileInfo
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 04a0b59a39..be655d287b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,20 +14,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+import abc
+import logging
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes
+from synapse.push.baserules import list_with_base_rules
 from synapse.storage.appservice import ApplicationServiceWorkerStore
 from synapse.storage.pusher import PusherWorkerStore
 from synapse.storage.receipts import ReceiptsWorkerStore
 from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.push.baserules import list_with_base_rules
-from synapse.api.constants import EventTypes
-from twisted.internet import defer
 
-import abc
-import logging
-import simplejson as json
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 307660b99a..cc273a57b2 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -14,16 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from twisted.internet import defer
+import logging
+import types
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
-import logging
-import simplejson as json
-import types
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c93c228f6e..0ac665e967 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -14,17 +14,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from .util.id_generators import StreamIdGenerator
-from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+import abc
+import logging
+
+from canonicaljson import json
 
 from twisted.internet import defer
 
-import abc
-import logging
-import simplejson as json
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
+from ._base import SQLBaseStore
+from .util.id_generators import StreamIdGenerator
 
 logger = logging.getLogger(__name__)
 
@@ -139,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
         """
         room_ids = set(room_ids)
 
-        if from_key:
+        if from_key is not None:
+            # Only ask the database about rooms where there have been new
+            # receipts added since `from_key`
             room_ids = yield self._receipts_stream_cache.get_entities_changed(
                 room_ids, from_key
             )
@@ -150,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -161,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 from the start.
 
         Returns:
-            list: A list of receipts.
+            Deferred[list]: A list of receipts.
+        """
+        if from_key is not None:
+            # Check the cache first to see if any new receipts have been added
+            # since`from_key`. If not we can no-op.
+            if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
+                defer.succeed([])
+
+        return self._get_linearized_receipts_for_room(room_id, to_key, from_key)
+
+    @cachedInlineCallbacks(num_args=3, tree=True)
+    def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
+        """See get_linearized_receipts_for_room
         """
         def f(txn):
             if from_key:
@@ -210,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             "content": content,
         }])
 
-    @cachedList(cached_method_name="get_linearized_receipts_for_room",
+    @cachedList(cached_method_name="_get_linearized_receipts_for_room",
                 list_name="room_ids", num_args=3, inlineCallbacks=True)
     def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
         if not room_ids:
@@ -372,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+        txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -492,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+        txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c241167fbe..07333f777d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,15 +15,15 @@
 
 import re
 
+from six.moves import range
+
 from twisted.internet import defer
 
-from synapse.api.errors import StoreError, Codes
+from synapse.api.errors import Codes, StoreError
 from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
-from six.moves import range
-
 
 class RegistrationWorkerStore(SQLBaseStore):
     @cached()
@@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
 
-    def user_delete_threepids(self, user_id):
-        return self._simple_delete(
-            "user_threepids",
-            keyvalues={
-                "user_id": user_id,
-            },
-            desc="user_delete_threepids",
-        )
-
     def user_delete_threepid(self, user_id, medium, address):
         return self._simple_delete(
             "user_threepids",
@@ -632,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore,
         Removes the given user to the table of users who need to be parted from all the
         rooms they're in, effectively marking that user as fully deactivated.
         """
-        return self._simple_delete_one(
+        # XXX: This should be simple_delete_one but we failed to put a unique index on
+        # the table, so somehow duplicate entries have ended up in it.
+        return self._simple_delete(
             "users_pending_deactivation",
             keyvalues={
                 "user_id": user_id,
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
index 40acb5c4ed..880f047adb 100644
--- a/synapse/storage/rejections.py
+++ b/synapse/storage/rejections.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-
 import logging
 
+from ._base import SQLBaseStore
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index ea6a189185..3147fb6827 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -13,6 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import collections
+import logging
+import re
+
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
@@ -20,11 +26,6 @@ from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
-import collections
-import logging
-import simplejson as json
-import re
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 48a88f755e..02a802bed9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -14,24 +14,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
+import logging
 from collections import namedtuple
 
+from six import iteritems, itervalues
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
 from synapse.storage.events import EventsWorkerStore
+from synapse.types import get_domain_from_id
 from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.stringutils import to_ascii
 
-from synapse.api.constants import Membership, EventTypes
-from synapse.types import get_domain_from_id
-
-import logging
-import simplejson as json
-
-from six import itervalues, iteritems
-
 logger = logging.getLogger(__name__)
 
 
@@ -455,7 +454,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         defer.returnValue(joined_hosts)
 
-    @cached(max_entries=10000, iterable=True)
+    @cached(max_entries=10000)
     def _get_joined_hosts_cache(self, room_id):
         return _JoinedHostsCache(self, room_id)
 
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index e7351c3ae6..4b2ffd35fd 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -14,11 +14,11 @@
 
 import logging
 
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-
 import simplejson
 
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.prepare_database import get_statements
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index 6df57b5206..414f9f5aa0 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -14,10 +14,10 @@
 
 import logging
 
-from synapse.storage.prepare_database import get_statements
-
 import simplejson
 
+from synapse.storage.prepare_database import get_statements
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 85bd1a2006..ef7ec34346 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,10 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from synapse.config.appservice import load_appservices
 
 from six.moves import range
 
+from synapse.config.appservice import load_appservices
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py
index fe6b7d196d..7d8ca5f93f 100644
--- a/synapse/storage/schema/delta/31/search_update.py
+++ b/synapse/storage/schema/delta/31/search_update.py
@@ -12,12 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.prepare_database import get_statements
-
 import logging
+
 import simplejson
 
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py
index 1e002f9db2..bff1256a7b 100644
--- a/synapse/storage/schema/delta/33/event_fields.py
+++ b/synapse/storage/schema/delta/33/event_fields.py
@@ -12,11 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import get_statements
-
 import logging
+
 import simplejson
 
+from synapse.storage.prepare_database import get_statements
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 55ae43f395..9754d3ccfb 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -14,7 +14,6 @@
 
 import time
 
-
 ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
 
 
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
index 3b63a1562d..cf09e43e2b 100644
--- a/synapse/storage/schema/delta/34/cache_stream.py
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -12,11 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine
-
 import logging
 
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py
index 033144341c..67d505e68b 100644
--- a/synapse/storage/schema/delta/34/received_txn_purge.py
+++ b/synapse/storage/schema/delta/34/received_txn_purge.py
@@ -12,10 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.engines import PostgresEngine
-
 import logging
 
+from synapse.storage.engines import PostgresEngine
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/34/sent_txn_purge.py b/synapse/storage/schema/delta/34/sent_txn_purge.py
index 81948e3431..0ffab10b6f 100644
--- a/synapse/storage/schema/delta/34/sent_txn_purge.py
+++ b/synapse/storage/schema/delta/34/sent_txn_purge.py
@@ -12,10 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.engines import PostgresEngine
-
 import logging
 
+from synapse.storage.engines import PostgresEngine
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py
index 20ad8bd5a6..a377884169 100644
--- a/synapse/storage/schema/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/delta/37/remove_auth_idx.py
@@ -12,11 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine
-
 import logging
 
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
 logger = logging.getLogger(__name__)
 
 DROP_INDICES = """
diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py
index ea6a18196d..506f326f4d 100644
--- a/synapse/storage/schema/delta/42/user_dir.py
+++ b/synapse/storage/schema/delta/42/user_dir.py
@@ -14,8 +14,8 @@
 
 import logging
 
-from synapse.storage.prepare_database import get_statements
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql
new file mode 100644
index 0000000000..5d8641a9ab
--- /dev/null
+++ b/synapse/storage/schema/delta/50/erasure_store.sql
@@ -0,0 +1,21 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a table of users who have requested that their details be erased
+CREATE TABLE erased_users (
+    user_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f0fa5d7631..d5b5df93e6 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -13,19 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from collections import namedtuple
 import logging
 import re
-import simplejson as json
+from collections import namedtuple
 
 from six import string_types
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
-from .background_updates import BackgroundUpdateStore
 from synapse.api.errors import SynapseError
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
+from .background_updates import BackgroundUpdateStore
+
 logger = logging.getLogger(__name__)
 
 SearchEntry = namedtuple('SearchEntry', [
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 25922e5a9c..470212aa2a 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -13,15 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
 import six
 
-from ._base import SQLBaseStore
-
 from unpaddedbase64 import encode_base64
+
+from twisted.internet import defer
+
 from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.util.caches.descriptors import cached, cachedList
 
+from ._base import SQLBaseStore
+
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index b452813fbb..c5ff44fef7 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -13,8 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from collections import namedtuple
 import logging
+from collections import namedtuple
 
 from six import iteritems, itervalues
 from six.moves import range
@@ -23,10 +23,11 @@ from twisted.internet import defer
 
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, get_cache_factor_for
+from synapse.util.caches import get_cache_factor_for, intern_string
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.stringutils import to_ascii
+
 from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
@@ -585,19 +586,24 @@ class StateGroupWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_state_for_groups(self, groups, types=None):
-        """Given list of groups returns dict of group -> list of state events
-        with matching types.
+        """Gets the state at each of a list of state groups, optionally
+        filtering by type/state_key
 
         Args:
-            groups(list[int]): list of groups whose state to query
-            types(list[str|None, str|None]|None): List of 2-tuples of the form
-                (`type`, `state_key`), where a `state_key` of `None` matches all
-                state_keys for the `type`. Presence of type of `None` indicates
-                that types not in the list should not be filtered out. If None,
-                all events are returned.
+            groups (iterable[int]): list of state groups for which we want
+                to get the state.
+            types (None|iterable[(None|str, None|str)]):
+                indicates the state type/keys required. If None, the whole
+                state is fetched and returned.
+
+                Otherwise, each entry should be a `(type, state_key)` tuple to
+                include in the response. A `state_key` of None is a wildcard
+                meaning that we require all state with that type. A `type` of None
+                indicates that types not in the list should not be filtered out.
 
         Returns:
-            dict of group -> list of state events
+            Deferred[dict[int, dict[(type, state_key), EventBase]]]
+                a dictionary mapping from state group to state dictionary.
         """
         if types:
             types = frozenset(types)
@@ -606,7 +612,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         if types is not None:
             for group in set(groups):
                 state_dict_ids, _, got_all = self._get_some_state_from_cache(
-                    group, types
+                    group, types,
                 )
                 results[group] = state_dict_ids
 
@@ -627,26 +633,40 @@ class StateGroupWorkerStore(SQLBaseStore):
             # Okay, so we have some missing_types, lets fetch them.
             cache_seq_num = self._state_group_cache.sequence
 
+            # the DictionaryCache knows if it has *all* the state, but
+            # does not know if it has all of the keys of a particular type,
+            # which makes wildcard lookups expensive unless we have a complete
+            # cache. Hence, if we are doing a wildcard lookup, populate the
+            # cache fully so that we can do an efficient lookup next time.
+
+            if types and any(k is None for (t, k) in types):
+                types_to_fetch = None
+            else:
+                types_to_fetch = types
+
             group_to_state_dict = yield self._get_state_groups_from_groups(
-                missing_groups, types
+                missing_groups, types_to_fetch,
             )
 
-            # Now we want to update the cache with all the things we fetched
-            # from the database.
             for group, group_state_dict in iteritems(group_to_state_dict):
                 state_dict = results[group]
 
-                state_dict.update(
-                    ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
-                    for k, v in iteritems(group_state_dict)
-                )
-
+                # update the result, filtering by `types`.
+                if types:
+                    for k, v in iteritems(group_state_dict):
+                        (typ, _) = k
+                        if k in types or (typ, None) in types:
+                            state_dict[k] = v
+                else:
+                    state_dict.update(group_state_dict)
+
+                # update the cache with all the things we fetched from the
+                # database.
                 self._state_group_cache.update(
                     cache_seq_num,
                     key=group,
-                    value=state_dict,
-                    full=(types is None),
-                    known_absent=types,
+                    value=group_state_dict,
+                    fetched_keys=types_to_fetch,
                 )
 
         defer.returnValue(results)
@@ -753,7 +773,6 @@ class StateGroupWorkerStore(SQLBaseStore):
                 self._state_group_cache.sequence,
                 key=state_group,
                 value=dict(current_state_ids),
-                full=True,
             )
 
             return state_group
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..66856342f0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -33,22 +33,20 @@ what sort order was used:
       and stream ordering columns respectively.
 """
 
+import abc
+import logging
+from collections import namedtuple
+
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.events import EventsWorkerStore
-
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.storage.engines import PostgresEngine
-
-import abc
-import logging
-
-from six.moves import range
-from collections import namedtuple
-
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 6671d3cfca..0f657b2bd3 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -14,16 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.account_data import AccountDataWorkerStore
-
-from synapse.util.caches.descriptors import cached
-from twisted.internet import defer
-
-import simplejson as json
 import logging
 
 from six.moves import range
 
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.util.caches.descriptors import cached
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e485d19b84..c3bc94f56d 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,18 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached
+import logging
+from collections import namedtuple
 
-from twisted.internet import defer
 import six
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 
-from collections import namedtuple
+from twisted.internet import defer
 
-import logging
-import simplejson as json
+from synapse.util.caches.descriptors import cached
+
+from ._base import SQLBaseStore
 
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 275c299998..a8781b0e5d 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -13,19 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+import re
 
-from ._base import SQLBaseStore
+from six import iteritems
+
+from twisted.internet import defer
 
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id, get_localpart_from_id
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
-from six import iteritems
-
-import re
-import logging
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
@@ -265,7 +265,7 @@ class UserDirectoryStore(SQLBaseStore):
         self.get_user_in_public_room.invalidate((user_id,))
 
     def get_users_in_public_due_to_room(self, room_id):
-        """Get all user_ids that are in the room directory becuase they're
+        """Get all user_ids that are in the room directory because they're
         in the given room_id
         """
         return self._simple_select_onecol(
@@ -277,7 +277,7 @@ class UserDirectoryStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_users_in_dir_due_to_room(self, room_id):
-        """Get all user_ids that are in the room directory becuase they're
+        """Get all user_ids that are in the room directory because they're
         in the given room_id
         """
         user_ids_dir = yield self._simple_select_onecol(
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
new file mode 100644
index 0000000000..be013f4427
--- /dev/null
+++ b/synapse/storage/user_erasure_store.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import operator
+
+from twisted.internet import defer
+
+from synapse.storage._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached, cachedList
+
+
+class UserErasureWorkerStore(SQLBaseStore):
+    @cached()
+    def is_user_erased(self, user_id):
+        """
+        Check if the given user id has requested erasure
+
+        Args:
+            user_id (str): full user id to check
+
+        Returns:
+            Deferred[bool]: True if the user has requested erasure
+        """
+        return self._simple_select_onecol(
+            table="erased_users",
+            keyvalues={"user_id": user_id},
+            retcol="1",
+            desc="is_user_erased",
+        ).addCallback(operator.truth)
+
+    @cachedList(
+        cached_method_name="is_user_erased",
+        list_name="user_ids",
+        inlineCallbacks=True,
+    )
+    def are_users_erased(self, user_ids):
+        """
+        Checks which users in a list have requested erasure
+
+        Args:
+            user_ids (iterable[str]): full user id to check
+
+        Returns:
+            Deferred[dict[str, bool]]:
+                for each user, whether the user has requested erasure.
+        """
+        # this serves the dual purpose of (a) making sure we can do len and
+        # iterate it multiple times, and (b) avoiding duplicates.
+        user_ids = tuple(set(user_ids))
+
+        def _get_erased_users(txn):
+            txn.execute(
+                "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % (
+                    ",".join("?" * len(user_ids))
+                ),
+                user_ids,
+            )
+            return set(r[0] for r in txn)
+
+        erased_users = yield self.runInteraction(
+            "are_users_erased", _get_erased_users,
+        )
+        res = dict((u, u in erased_users) for u in user_ids)
+        defer.returnValue(res)
+
+
+class UserErasureStore(UserErasureWorkerStore):
+    def mark_user_erased(self, user_id):
+        """Indicate that user_id wishes their message history to be erased.
+
+        Args:
+            user_id (str): full user_id to be erased
+        """
+        def f(txn):
+            # first check if they are already in the list
+            txn.execute(
+                "SELECT 1 FROM erased_users WHERE user_id = ?",
+                (user_id, )
+            )
+            if txn.fetchone():
+                return
+
+            # they are not already there: do the insert.
+            txn.execute(
+                "INSERT INTO erased_users (user_id) VALUES (?)",
+                (user_id, )
+            )
+
+            self._invalidate_cache_and_stream(
+                txn, self.is_user_erased, (user_id,)
+            )
+        return self.runInteraction("mark_user_erased", f)
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 95031dc9ec..d6160d5e4d 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -13,9 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from collections import deque
 import contextlib
 import threading
+from collections import deque
 
 
 class IdGenerator(object):