summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/appservice.py8
-rw-r--r--synapse/app/client_reader.py8
-rw-r--r--synapse/app/federation_reader.py8
-rw-r--r--synapse/app/federation_sender.py8
-rwxr-xr-xsynapse/app/homeserver.py9
-rw-r--r--synapse/app/media_repository.py8
-rw-r--r--synapse/app/pusher.py9
-rw-r--r--synapse/app/synchrotron.py9
-rw-r--r--synapse/events/snapshot.py26
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/state.py11
-rw-r--r--synapse/storage/background_updates.py15
-rw-r--r--synapse/storage/event_federation.py24
-rw-r--r--synapse/storage/events.py281
-rw-r--r--synapse/storage/state.py10
-rw-r--r--synapse/util/logcontext.py61
-rw-r--r--tests/util/test_log_context.py61
17 files changed, 425 insertions, 146 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 83ee3e3ce3..a6f1e7594e 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -187,7 +187,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 7ed0de4117..a821a6ce62 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index ca742de6b2..e52b0f240d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -31,7 +31,7 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -184,7 +184,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cf5b196e6..76c4cc54d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0b9d78c13c..2cdd2d39ff 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -52,7 +52,7 @@ from synapse.api.urls import (
 )
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.metrics import register_memory_metrics, get_metrics_for
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
@@ -456,7 +456,12 @@ def run(hs):
     def in_thread():
         # Uncomment to enable tracing of log context changes.
         # sys.settrace(logcontext_tracer)
-        with LoggingContext("run"):
+
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             change_resource_limit(hs.config.soft_file_limit)
             if hs.config.gc_thresholds:
                 gc.set_threshold(*hs.config.gc_thresholds)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index c5579d9e38..3c7984a237 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -190,7 +190,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b025db54d4..ab682e52ec 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine
 from synapse.storage import DataStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -275,7 +276,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 449fac771b..a68bb873fe 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,7 +48,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -496,7 +497,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 11605b34a3..6be18880b9 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,6 +15,32 @@
 
 
 class EventContext(object):
+    """
+    Attributes:
+        current_state_ids (dict[(str, str), str]):
+            The current state map including the current event.
+            (type, state_key) -> event_id
+
+        prev_state_ids (dict[(str, str), str]):
+            The current state map excluding the current event.
+            (type, state_key) -> event_id
+
+        state_group (int): state group id
+        rejected (bool|str): A rejection reason if the event was rejected, else
+            False
+
+        push_actions (list[(str, list[object])]): list of (user_id, actions)
+            tuples
+
+        prev_group (int): Previously persisted state group. ``None`` for an
+            outlier.
+        delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
+            (type, state_key) -> event_id. ``None`` for an outlier.
+
+        prev_state_events (?): XXX: is this ever set to anything other than
+            the empty list?
+    """
+
     __slots__ = [
         "current_state_ids",
         "prev_state_ids",
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0cd5501b05..888dd01240 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -933,8 +933,9 @@ class FederationHandler(BaseHandler):
             # lots of requests for missing prev_events which we do actually
             # have. Hence we fire off the deferred, but don't wait for it.
 
-            synapse.util.logcontext.reset_context_after_deferred(
-                self._handle_queued_pdus(room_queue))
+            synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+                room_queue
+            )
 
         defer.returnValue(True)
 
@@ -1537,7 +1538,17 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
+        """
+
+        Args:
+            origin:
+            event:
+            state:
+            auth_events:
 
+        Returns:
+            Deferred, which resolves to synapse.events.snapshot.EventContext
+        """
         context = yield self.state_handler.compute_event_context(
             event, old_state=state,
         )
diff --git a/synapse/state.py b/synapse/state.py
index 383d32b163..9a523a1b89 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -177,17 +177,12 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     def compute_event_context(self, event, old_state=None):
-        """ Fills out the context with the `current state` of the graph. The
-        `current state` here is defined to be the state of the event graph
-        just before the event - i.e. it never includes `event`
-
-        If `event` has `auth_events` then this will also fill out the
-        `auth_events` field on `context` from the `current_state`.
+        """Build an EventContext structure for the event.
 
         Args:
-            event (EventBase)
+            event (synapse.events.EventBase):
         Returns:
-            an EventContext
+            synapse.events.snapshot.EventContext:
         """
         context = EventContext()
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 94b2bcc54a..813ad59e56 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import synapse.util.async
 
 from ._base import SQLBaseStore
 from . import engines
@@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_performance = {}
         self._background_update_queue = []
         self._background_update_handlers = {}
-        self._background_update_timer = None
 
     @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        assert self._background_update_timer is None, \
-            "background updates already running"
-
         logger.info("Starting background schema updates")
 
         while True:
-            sleep = defer.Deferred()
-            self._background_update_timer = self._clock.call_later(
-                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
-            )
-            try:
-                yield sleep
-            finally:
-                self._background_update_timer = None
+            yield synapse.util.async.sleep(
+                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
 
             try:
                 result = yield self.do_next_background_update(
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 256e50dc20..0d97de2fe7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore):
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
         min_depth = self._get_min_depth_interaction(txn, room_id)
 
-        do_insert = depth < min_depth if min_depth else True
+        if min_depth and depth >= min_depth:
+            return
 
-        if do_insert:
-            self._simple_upsert_txn(
-                txn,
-                table="room_depth",
-                keyvalues={
-                    "room_id": room_id,
-                },
-                values={
-                    "min_depth": depth,
-                },
-            )
+        self._simple_upsert_txn(
+            txn,
+            table="room_depth",
+            keyvalues={
+                "room_id": room_id,
+            },
+            values={
+                "min_depth": depth,
+            },
+        )
 
     def _handle_mult_prev_events(self, txn, events):
         """
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 72319c35ae..4d3cfc336f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple, OrderedDict
 from functools import wraps
 
-import synapse
 import synapse.metrics
 
-
 import logging
 import math
 import ujson as json
 
+# these are only included to make the type annotations work
+from synapse.events import EventBase    # noqa: F401
+from synapse.events.snapshot import EventContext   # noqa: F401
+
 logger = logging.getLogger(__name__)
 
 
@@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
 
     def add_to_queue(self, room_id, events_and_contexts, backfilled):
         """Add events to the queue, with the given persist_event options.
+
+        Args:
+            room_id (str):
+            events_and_contexts (list[(EventBase, EventContext)]):
+            backfilled (bool):
         """
         queue = self._event_persist_queues.setdefault(room_id, deque())
         if queue:
@@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False):
+        """
+
+        Args:
+            event (EventBase):
+            context (EventContext):
+            backfilled (bool):
+
+        Returns:
+            Deferred: resolves to (int, int): the stream ordering of ``event``,
+            and the stream ordering of the latest persisted event
+        """
         deferred = self._event_persist_queue.add_to_queue(
             event.room_id, [(event, context)],
             backfilled=backfilled,
@@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _persist_events(self, events_and_contexts, backfilled=False,
                         delete_existing=False):
+        """Persist events to db
+
+        Args:
+            events_and_contexts (list[(EventBase, EventContext)]):
+            backfilled (bool):
+            delete_existing (bool):
+
+        Returns:
+            Deferred: resolves when the events have been persisted
+        """
         if not events_and_contexts:
             return
 
@@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore):
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
 
-        If delete_existing is True then existing events will be purged from the
-        database before insertion. This is useful when retrying due to IntegrityError.
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]):
+                events to persist
+            backfilled (bool): True if the events were backfilled
+            delete_existing (bool): True to purge existing table rows for the
+                events from the database. This is useful when retrying due to
+                IntegrityError.
+            current_state_for_room (dict[str, (list[str], list[str])]):
+                The current-state delta for each room. For each room, a tuple
+                (to_delete, to_insert), being a list of event ids to be removed
+                from the current state, and a list of event ids to be added to
+                the current state.
+            new_forward_extremeties (dict[str, list[str]]):
+                The new forward extremities for each room. For each room, a
+                list of the event ids which are the forward extremities.
+
         """
+        self._update_current_state_txn(txn, current_state_for_room)
+
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
-        for room_id, current_state_tuple in current_state_for_room.iteritems():
+        self._update_forward_extremities_txn(
+            txn,
+            new_forward_extremities=new_forward_extremeties,
+            max_stream_order=max_stream_order,
+        )
+
+        # Ensure that we don't have the same event twice.
+        events_and_contexts = self._filter_events_and_contexts_for_duplicates(
+            events_and_contexts,
+        )
+
+        self._update_room_depths_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+        )
+
+        # _update_outliers_txn filters out any events which have already been
+        # persisted, and returns the filtered list.
+        events_and_contexts = self._update_outliers_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # From this point onwards the events are only events that we haven't
+        # seen before.
+
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+            self._delete_existing_rows_txn(
+                txn,
+                events_and_contexts=events_and_contexts,
+            )
+
+        self._store_event_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # Insert into the state_groups, state_groups_state, and
+        # event_to_state_groups tables.
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+        # _store_rejected_events_txn filters out any events which were
+        # rejected, and returns the filtered list.
+        events_and_contexts = self._store_rejected_events_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # From this point onwards the events are only ones that weren't
+        # rejected.
+
+        self._update_metadata_tables_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+        )
+
+    def _update_current_state_txn(self, txn, state_delta_by_room):
+        for room_id, current_state_tuple in state_delta_by_room.iteritems():
                 to_delete, to_insert = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
@@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_current_state_ids, (room_id,)
                 )
 
-        for room_id, new_extrem in new_forward_extremeties.items():
+    def _update_forward_extremities_txn(self, txn, new_forward_extremities,
+                                        max_stream_order):
+        for room_id, new_extrem in new_forward_extremities.items():
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremeties.items()
+                for room_id, new_extrem in new_forward_extremities.items()
                 for ev_id in new_extrem
             ],
         )
@@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremeties.items()
+                for room_id, new_extrem in new_forward_extremities.items()
                 for event_id in new_extrem
             ]
         )
 
-        # Ensure that we don't have the same event twice.
-        # Pick the earliest non-outlier if there is one, else the earliest one.
+    @classmethod
+    def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
+        """Ensure that we don't have the same event twice.
+
+        Pick the earliest non-outlier if there is one, else the earliest one.
+
+        Args:
+            events_and_contexts (list[(EventBase, EventContext)]):
+        Returns:
+            list[(EventBase, EventContext)]: filtered list
+        """
         new_events_and_contexts = OrderedDict()
         for event, context in events_and_contexts:
             prev_event_context = new_events_and_contexts.get(event.event_id)
@@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore):
                         new_events_and_contexts[event.event_id] = (event, context)
             else:
                 new_events_and_contexts[event.event_id] = (event, context)
+        return new_events_and_contexts.values()
 
-        events_and_contexts = new_events_and_contexts.values()
+    def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
+        """Update min_depth for each room
 
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            backfilled (bool): True if the events were backfilled
+        """
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore):
         for room_id, depth in depth_updates.items():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
+    def _update_outliers_txn(self, txn, events_and_contexts):
+        """Update any outliers with new event info.
+
+        This turns outliers into ex-outliers (unless the new event was
+        rejected).
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+
+        Returns:
+            list[(EventBase, EventContext)] new list, without events which
+            are already in the events table.
+        """
         txn.execute(
             "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
                 ",".join(["?"] * len(events_and_contexts)),
@@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore):
 
         to_remove = set()
         for event, context in events_and_contexts:
-            if context.rejected:
-                # If the event is rejected then we don't care if the event
-                # was an outlier or not.
-                if event.event_id in have_persisted:
-                    # If we have already seen the event then ignore it.
-                    to_remove.add(event)
-                continue
-
             if event.event_id not in have_persisted:
                 continue
 
             to_remove.add(event)
 
+            if context.rejected:
+                # If the event is rejected then we don't care if the event
+                # was an outlier or not.
+                continue
+
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
                 # We received a copy of an event that we had already stored as
@@ -764,37 +903,19 @@ class EventsStore(SQLBaseStore):
                 # event isn't an outlier any more.
                 self._update_backward_extremeties(txn, [event])
 
-        events_and_contexts = [
+        return [
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
+    @classmethod
+    def _delete_existing_rows_txn(cls, txn, events_and_contexts):
         if not events_and_contexts:
-            # Make sure we don't pass an empty list to functions that expect to
-            # be storing at least one element.
+            # nothing to do here
             return
 
-        # From this point onwards the events are only events that we haven't
-        # seen before.
+        logger.info("Deleting existing")
 
-        def event_dict(event):
-            return {
-                k: v
-                for k, v in event.get_dict().items()
-                if k not in [
-                    "redacted",
-                    "redacted_because",
-                ]
-            }
-
-        if delete_existing:
-            # For paranoia reasons, we go and delete all the existing entries
-            # for these events so we can reinsert them.
-            # This gets around any problems with some tables already having
-            # entries.
-
-            logger.info("Deleting existing")
-
-            for table in (
+        for table in (
                 "events",
                 "event_auth",
                 "event_json",
@@ -817,11 +938,34 @@ class EventsStore(SQLBaseStore):
                 "redactions",
                 "room_memberships",
                 "topics"
-            ):
-                txn.executemany(
-                    "DELETE FROM %s WHERE event_id = ?" % (table,),
-                    [(ev.event_id,) for ev, _ in events_and_contexts]
-                )
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
+    def _store_event_txn(self, txn, events_and_contexts):
+        """Insert new events into the event and event_json tables
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+        """
+
+        if not events_and_contexts:
+            # nothing to do here
+            return
+
+        def event_dict(event):
+            return {
+                k: v
+                for k, v in event.get_dict().items()
+                if k not in [
+                    "redacted",
+                    "redacted_because",
+                ]
+            }
 
         self._simple_insert_many_txn(
             txn,
@@ -865,6 +1009,19 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+    def _store_rejected_events_txn(self, txn, events_and_contexts):
+        """Add rows to the 'rejections' table for received events which were
+        rejected
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+
+        Returns:
+            list[(EventBase, EventContext)] new list, without the rejected
+                events.
+        """
         # Remove the rejected events from the list now that we've added them
         # to the events table and the events_json table.
         to_remove = set()
@@ -876,17 +1033,24 @@ class EventsStore(SQLBaseStore):
                 )
                 to_remove.add(event)
 
-        events_and_contexts = [
+        return [
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
+    def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+        """Update all the miscellaneous tables for new events
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            backfilled (bool): True if the events were backfilled
+        """
+
         if not events_and_contexts:
-            # Make sure we don't pass an empty list to functions that expect to
-            # be storing at least one element.
+            # nothing to do here
             return
 
-        # From this point onwards the events are only ones that weren't rejected.
-
         for event, context in events_and_contexts:
             # Insert all the push actions into the event_push_actions table.
             if context.push_actions:
@@ -915,10 +1079,6 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
-        # Insert into the state_groups, state_groups_state, and
-        # event_to_state_groups tables.
-        self._store_mult_state_groups_txn(txn, events_and_contexts)
-
         # Update the event_forward_extremities, event_backward_extremities and
         # event_edges tables.
         self._handle_mult_prev_events(
@@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore):
         # Prefill the event cache
         self._add_to_cache(txn, events_and_contexts)
 
-        if backfilled:
-            # Backfilled events come before the current state so we don't need
-            # to update the current state table
-            return
-
-        return
-
     def _add_to_cache(self, txn, events_and_contexts):
         to_prefill = []
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 27f1ec89ec..1b42bea07a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -136,6 +136,16 @@ class StateStore(SQLBaseStore):
                 continue
 
             if context.current_state_ids is None:
+                # AFAIK, this can never happen
+                logger.error(
+                    "Non-outlier event %s had current_state_ids==None",
+                    event.event_id)
+                continue
+
+            # if the event was rejected, just give it the same state as its
+            # predecessor.
+            if context.rejected:
+                state_groups[event.event_id] = context.prev_group
                 continue
 
             state_groups[event.event_id] = context.state_group
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 1135a683af..ff67b1d794 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -318,47 +318,44 @@ def preserve_context_over_deferred(deferred, context=None):
     return d
 
 
-def reset_context_after_deferred(deferred):
-    """If the deferred is incomplete, add a callback which will reset the
-    context.
-
-    This is useful when you want to fire off a deferred, but don't want to
-    wait for it to complete. (The deferred will restore the current log context
-    when it completes, so if you don't do anything, it will leak log context.)
-
-    (If this feels asymmetric, consider it this way: we are effectively forking
-    a new thread of execution. We are probably currently within a
-    ``with LoggingContext()`` block, which is supposed to have a single entry
-    and exit point. But by spawning off another deferred, we are effectively
-    adding a new exit point.)
+def preserve_fn(f):
+    """Wraps a function, to ensure that the current context is restored after
+    return from the function, and that the sentinel context is set once the
+    deferred returned by the funtion completes.
 
-    Args:
-        deferred (defer.Deferred): deferred
+    Useful for wrapping functions that return a deferred which you don't yield
+    on.
     """
     def reset_context(result):
         LoggingContext.set_current_context(LoggingContext.sentinel)
         return result
 
-    if not deferred.called:
-        deferred.addBoth(reset_context)
-
-
-def preserve_fn(f):
-    """Ensures that function is called with correct context and that context is
-    restored after return. Useful for wrapping functions that return a deferred
-    which you don't yield on.
-    """
+    # XXX: why is this here rather than inside g? surely we want to preserve
+    # the context from the time the function was called, not when it was
+    # wrapped?
     current = LoggingContext.current_context()
 
     def g(*args, **kwargs):
-        with PreserveLoggingContext(current):
-            res = f(*args, **kwargs)
-            if isinstance(res, defer.Deferred):
-                return preserve_context_over_deferred(
-                    res, context=LoggingContext.sentinel
-                )
-            else:
-                return res
+        res = f(*args, **kwargs)
+        if isinstance(res, defer.Deferred) and not res.called:
+            # The function will have reset the context before returning, so
+            # we need to restore it now.
+            LoggingContext.set_current_context(current)
+
+            # The original context will be restored when the deferred
+            # completes, but there is nothing waiting for it, so it will
+            # get leaked into the reactor or some other function which
+            # wasn't expecting it. We therefore need to reset the context
+            # here.
+            #
+            # (If this feels asymmetric, consider it this way: we are
+            # effectively forking a new thread of execution. We are
+            # probably currently within a ``with LoggingContext()`` block,
+            # which is supposed to have a single entry and exit point. But
+            # by spawning off another deferred, we are effectively
+            # adding a new exit point.)
+            res.addBoth(reset_context)
+        return res
     return g
 
 
diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py
index 65a330a0e9..9ffe209c4d 100644
--- a/tests/util/test_log_context.py
+++ b/tests/util/test_log_context.py
@@ -1,8 +1,10 @@
+import twisted.python.failure
 from twisted.internet import defer
 from twisted.internet import reactor
 from .. import unittest
 
 from synapse.util.async import sleep
+from synapse.util import logcontext
 from synapse.util.logcontext import LoggingContext
 
 
@@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase):
             context_one.test_key = "one"
             yield sleep(0)
             self._check_test_key("one")
+
+    def _test_preserve_fn(self, function):
+        sentinel_context = LoggingContext.current_context()
+
+        callback_completed = [False]
+
+        @defer.inlineCallbacks
+        def cb():
+            context_one.test_key = "one"
+            yield function()
+            self._check_test_key("one")
+
+            callback_completed[0] = True
+
+        with LoggingContext() as context_one:
+            context_one.test_key = "one"
+
+            # fire off function, but don't wait on it.
+            logcontext.preserve_fn(cb)()
+
+            self._check_test_key("one")
+
+        # now wait for the function under test to have run, and check that
+        # the logcontext is left in a sane state.
+        d2 = defer.Deferred()
+
+        def check_logcontext():
+            if not callback_completed[0]:
+                reactor.callLater(0.01, check_logcontext)
+                return
+
+            # make sure that the context was reset before it got thrown back
+            # into the reactor
+            try:
+                self.assertIs(LoggingContext.current_context(),
+                              sentinel_context)
+                d2.callback(None)
+            except BaseException:
+                d2.errback(twisted.python.failure.Failure())
+
+        reactor.callLater(0.01, check_logcontext)
+
+        # test is done once d2 finishes
+        return d2
+
+    def test_preserve_fn_with_blocking_fn(self):
+        @defer.inlineCallbacks
+        def blocking_function():
+            yield sleep(0)
+
+        return self._test_preserve_fn(blocking_function)
+
+    def test_preserve_fn_with_non_blocking_fn(self):
+        @defer.inlineCallbacks
+        def nonblocking_function():
+            with logcontext.PreserveLoggingContext():
+                yield defer.succeed(None)
+
+        return self._test_preserve_fn(nonblocking_function)