summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-08-11 14:09:13 +0100
committerDavid Baker <dave@matrix.org>2016-08-11 14:09:13 +0100
commitb4ecf0b886c67437901e0af457c5f801ebde9a72 (patch)
treeef66b0684edcfeb4ad68d20375641f4654393f44 /synapse/storage/events.py
parentInclude the ts the notif was received at (diff)
parentMerge pull request #1003 from matrix-org/erikj/redaction_prev_content (diff)
downloadsynapse-b4ecf0b886c67437901e0af457c5f801ebde9a72.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/notifications_api
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py812
1 files changed, 543 insertions, 269 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..d2feee8dbb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,9 +23,14 @@ from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
+from functools import wraps
+
+import synapse
+import synapse.metrics
 
 
 import logging
@@ -35,6 +40,10 @@ import ujson as json
 logger = logging.getLogger(__name__)
 
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
 def encode_json(json_object):
     if USE_FROZEN_DICTS:
         # ujson doesn't like frozen_dicts
@@ -139,8 +148,32 @@ class _EventPeristenceQueue(object):
             pass
 
 
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
+def _retry_on_integrity_error(func):
+    """Wraps a database function so that it gets retried on IntegrityError,
+    with `delete_existing=True` passed in.
+
+    Args:
+        func: function that returns a Deferred and accepts a `delete_existing` arg
+    """
+    @wraps(func)
+    @defer.inlineCallbacks
+    def f(self, *args, **kwargs):
+        try:
+            res = yield func(self, *args, **kwargs)
+        except self.database_engine.module.IntegrityError:
+            logger.exception("IntegrityError, retrying.")
+            res = yield func(self, *args, delete_existing=True, **kwargs)
+        defer.returnValue(res)
+
+    return f
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
     def __init__(self, hs):
         super(EventsStore, self).__init__(hs)
@@ -148,6 +181,10 @@ class EventsStore(SQLBaseStore):
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
+        self.register_background_update_handler(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+            self._background_reindex_fields_sender,
+        )
 
         self._event_persist_queue = _EventPeristenceQueue()
 
@@ -213,8 +250,10 @@ class EventsStore(SQLBaseStore):
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
-    def _persist_events(self, events_and_contexts, backfilled=False):
+    def _persist_events(self, events_and_contexts, backfilled=False,
+                        delete_existing=False):
         if not events_and_contexts:
             return
 
@@ -257,11 +296,15 @@ class EventsStore(SQLBaseStore):
                         self._persist_events_txn,
                         events_and_contexts=chunk,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
+                    persist_event_counter.inc_by(len(chunk))
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
     @log_function
-    def _persist_event(self, event, context, current_state=None, backfilled=False):
+    def _persist_event(self, event, context, current_state=None, backfilled=False,
+                       delete_existing=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -274,7 +317,9 @@ class EventsStore(SQLBaseStore):
                         context=context,
                         current_state=current_state,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
+                    persist_event_counter.inc()
         except _RollbackButIsFineException:
             pass
 
@@ -305,7 +350,7 @@ class EventsStore(SQLBaseStore):
         )
 
         if not events and not allow_none:
-            raise RuntimeError("Could not find event %s" % (event_id,))
+            raise SynapseError(404, "Could not find event %s" % (event_id,))
 
         defer.returnValue(events[0] if events else None)
 
@@ -335,18 +380,15 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
+    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+                           delete_existing=False):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
             txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
-            txn.call_after(
-                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
-            )
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
@@ -379,10 +421,38 @@ class EventsStore(SQLBaseStore):
             txn,
             [(event, context)],
             backfilled=backfilled,
+            delete_existing=delete_existing,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+                            delete_existing=False):
+        """Insert some number of room events into the necessary database tables.
+
+        Rejected events are only inserted into the events table, the events_json table,
+        and the rejections table. Things reading from those table will need to check
+        whether the event was rejected.
+
+        If delete_existing is True then existing events will be purged from the
+        database before insertion. This is useful when retrying due to IntegrityError.
+        """
+        # Ensure that we don't have the same event twice.
+        # Pick the earliest non-outlier if there is one, else the earliest one.
+        new_events_and_contexts = OrderedDict()
+        for event, context in events_and_contexts:
+            prev_event_context = new_events_and_contexts.get(event.event_id)
+            if prev_event_context:
+                if not event.internal_metadata.is_outlier():
+                    if prev_event_context[0].internal_metadata.is_outlier():
+                        # To ensure correct ordering we pop, as OrderedDict is
+                        # ordered by first insertion.
+                        new_events_and_contexts.pop(event.event_id, None)
+                        new_events_and_contexts[event.event_id] = (event, context)
+            else:
+                new_events_and_contexts[event.event_id] = (event, context)
+
+        events_and_contexts = new_events_and_contexts.values()
+
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -393,21 +463,11 @@ class EventsStore(SQLBaseStore):
                     event.room_id, event.internal_metadata.stream_ordering,
                 )
 
-            if not event.internal_metadata.is_outlier():
+            if not event.internal_metadata.is_outlier() and not context.rejected:
                 depth_updates[event.room_id] = max(
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-            if context.push_actions:
-                self._set_push_actions_for_event_and_users_txn(
-                    txn, event, context.push_actions
-                )
-
-        if event.type == EventTypes.Redaction and event.redacts is not None:
-            self._remove_push_actions_for_event_id_txn(
-                txn, event.room_id, event.redacts
-            )
-
         for room_id, depth in depth_updates.items():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
@@ -417,30 +477,21 @@ class EventsStore(SQLBaseStore):
             ),
             [event.event_id for event, _ in events_and_contexts]
         )
+
         have_persisted = {
             event_id: outlier
             for event_id, outlier in txn.fetchall()
         }
 
-        event_map = {}
         to_remove = set()
         for event, context in events_and_contexts:
-            # Handle the case of the list including the same event multiple
-            # times. The tricky thing here is when they differ by whether
-            # they are an outlier.
-            if event.event_id in event_map:
-                other = event_map[event.event_id]
-
-                if not other.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                elif not event.internal_metadata.is_outlier():
+            if context.rejected:
+                # If the event is rejected then we don't care if the event
+                # was an outlier or not.
+                if event.event_id in have_persisted:
+                    # If we have already seen the event then ignore it.
                     to_remove.add(event)
-                    continue
-                else:
-                    to_remove.add(other)
-
-            event_map[event.event_id] = event
+                continue
 
             if event.event_id not in have_persisted:
                 continue
@@ -449,6 +500,12 @@ class EventsStore(SQLBaseStore):
 
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
+                # We received a copy of an event that we had already stored as
+                # an outlier in the database. We now have some state at that
+                # so we need to update the state_groups table with that state.
+
+                # insert into the state_group, state_groups_state and
+                # event_to_state_groups tables.
                 self._store_mult_state_groups_txn(txn, ((event, context),))
 
                 metadata_json = encode_json(
@@ -464,6 +521,8 @@ class EventsStore(SQLBaseStore):
                     (metadata_json, event.event_id,)
                 )
 
+                # Add an entry to the ex_outlier_stream table to replicate the
+                # change in outlier status to our workers.
                 stream_order = event.internal_metadata.stream_ordering
                 state_group_id = context.state_group or context.new_state_group_id
                 self._simple_insert_txn(
@@ -485,6 +544,8 @@ class EventsStore(SQLBaseStore):
                     (False, event.event_id,)
                 )
 
+                # Update the event_backward_extremities table now that this
+                # event isn't an outlier any more.
                 self._update_extremeties(txn, [event])
 
         events_and_contexts = [
@@ -492,38 +553,12 @@ class EventsStore(SQLBaseStore):
         ]
 
         if not events_and_contexts:
+            # Make sure we don't pass an empty list to functions that expect to
+            # be storing at least one element.
             return
 
-        self._store_mult_state_groups_txn(txn, events_and_contexts)
-
-        self._handle_mult_prev_events(
-            txn,
-            events=[event for event, _ in events_and_contexts],
-        )
-
-        for event, _ in events_and_contexts:
-            if event.type == EventTypes.Name:
-                self._store_room_name_txn(txn, event)
-            elif event.type == EventTypes.Topic:
-                self._store_room_topic_txn(txn, event)
-            elif event.type == EventTypes.Message:
-                self._store_room_message_txn(txn, event)
-            elif event.type == EventTypes.Redaction:
-                self._store_redaction(txn, event)
-            elif event.type == EventTypes.RoomHistoryVisibility:
-                self._store_history_visibility_txn(txn, event)
-            elif event.type == EventTypes.GuestAccess:
-                self._store_guest_access_txn(txn, event)
-
-        self._store_room_members_txn(
-            txn,
-            [
-                event
-                for event, _ in events_and_contexts
-                if event.type == EventTypes.Member
-            ],
-            backfilled=backfilled,
-        )
+        # From this point onwards the events are only events that we haven't
+        # seen before.
 
         def event_dict(event):
             return {
@@ -535,6 +570,43 @@ class EventsStore(SQLBaseStore):
                 ]
             }
 
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+
+            logger.info("Deleting existing")
+
+            for table in (
+                "events",
+                "event_auth",
+                "event_json",
+                "event_content_hashes",
+                "event_destinations",
+                "event_edge_hashes",
+                "event_edges",
+                "event_forward_extremities",
+                "event_push_actions",
+                "event_reference_hashes",
+                "event_search",
+                "event_signatures",
+                "event_to_state_groups",
+                "guest_access",
+                "history_visibility",
+                "local_invites",
+                "room_names",
+                "state_events",
+                "rejections",
+                "redactions",
+                "room_memberships",
+                "state_events"
+            ):
+                txn.executemany(
+                    "DELETE FROM %s WHERE event_id = ?" % (table,),
+                    [(ev.event_id,) for ev, _ in events_and_contexts]
+                )
+
         self._simple_insert_many_txn(
             txn,
             table="event_json",
@@ -567,15 +639,51 @@ class EventsStore(SQLBaseStore):
                     "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
                     "received_ts": self._clock.time_msec(),
+                    "sender": event.sender,
+                    "contains_url": (
+                        "url" in event.content
+                        and isinstance(event.content["url"], basestring)
+                    ),
                 }
                 for event, _ in events_and_contexts
             ],
         )
 
-        if context.rejected:
-            self._store_rejections_txn(
-                txn, event.event_id, context.rejected
-            )
+        # Remove the rejected events from the list now that we've added them
+        # to the events table and the events_json table.
+        to_remove = set()
+        for event, context in events_and_contexts:
+            if context.rejected:
+                # Insert the event_id into the rejections table
+                self._store_rejections_txn(
+                    txn, event.event_id, context.rejected
+                )
+                to_remove.add(event)
+
+        events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0] not in to_remove
+        ]
+
+        if not events_and_contexts:
+            # Make sure we don't pass an empty list to functions that expect to
+            # be storing at least one element.
+            return
+
+        # From this point onwards the events are only ones that weren't rejected.
+
+        for event, context in events_and_contexts:
+            # Insert all the push actions into the event_push_actions table.
+            if context.push_actions:
+                self._set_push_actions_for_event_and_users_txn(
+                    txn, event, context.push_actions
+                )
+
+            if event.type == EventTypes.Redaction and event.redacts is not None:
+                # Remove the entries in the event_push_actions table for the
+                # redacted event.
+                self._remove_push_actions_for_event_id_txn(
+                    txn, event.room_id, event.redacts
+                )
 
         self._simple_insert_many_txn(
             txn,
@@ -591,6 +699,49 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+        # Insert into the state_groups, state_groups_state, and
+        # event_to_state_groups tables.
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+        # Update the event_forward_extremities, event_backward_extremities and
+        # event_edges tables.
+        self._handle_mult_prev_events(
+            txn,
+            events=[event for event, _ in events_and_contexts],
+        )
+
+        for event, _ in events_and_contexts:
+            if event.type == EventTypes.Name:
+                # Insert into the room_names and event_search tables.
+                self._store_room_name_txn(txn, event)
+            elif event.type == EventTypes.Topic:
+                # Insert into the topics table and event_search table.
+                self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Message:
+                # Insert into the event_search table.
+                self._store_room_message_txn(txn, event)
+            elif event.type == EventTypes.Redaction:
+                # Insert into the redactions table.
+                self._store_redaction(txn, event)
+            elif event.type == EventTypes.RoomHistoryVisibility:
+                # Insert into the event_search table.
+                self._store_history_visibility_txn(txn, event)
+            elif event.type == EventTypes.GuestAccess:
+                # Insert into the event_search table.
+                self._store_guest_access_txn(txn, event)
+
+        # Insert into the room_memberships table.
+        self._store_room_members_txn(
+            txn,
+            [
+                event
+                for event, _ in events_and_contexts
+                if event.type == EventTypes.Member
+            ],
+            backfilled=backfilled,
+        )
+
+        # Insert event_reference_hashes table.
         self._store_event_reference_hashes_txn(
             txn, [event for event, _ in events_and_contexts]
         )
@@ -635,6 +786,9 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+        # Prefill the event cache
+        self._add_to_cache(txn, events_and_contexts)
+
         if backfilled:
             # Backfilled events come before the current state so we don't need
             # to update the current state table
@@ -645,22 +799,11 @@ class EventsStore(SQLBaseStore):
                 # Outlier events shouldn't clobber the current state.
                 continue
 
-            if context.rejected:
-                # If the event failed it's auth checks then it shouldn't
-                # clobbler the current state.
-                continue
-
             txn.call_after(
                 self._get_current_state_for_key.invalidate,
                 (event.room_id, event.type, event.state_key,)
             )
 
-            if event.type in [EventTypes.Name, EventTypes.Aliases]:
-                txn.call_after(
-                    self.get_room_name_and_aliases.invalidate,
-                    (event.room_id,)
-                )
-
             self._simple_upsert_txn(
                 txn,
                 "current_state_events",
@@ -676,6 +819,45 @@ class EventsStore(SQLBaseStore):
 
         return
 
+    def _add_to_cache(self, txn, events_and_contexts):
+        to_prefill = []
+
+        rows = []
+        N = 200
+        for i in range(0, len(events_and_contexts), N):
+            ev_map = {
+                e[0].event_id: e[0]
+                for e in events_and_contexts[i:i + N]
+            }
+            if not ev_map:
+                break
+
+            sql = (
+                "SELECT "
+                " e.event_id as event_id, "
+                " r.redacts as redacts,"
+                " rej.event_id as rejects "
+                " FROM events as e"
+                " LEFT JOIN rejections as rej USING (event_id)"
+                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+                " WHERE e.event_id IN (%s)"
+            ) % (",".join(["?"] * len(ev_map)),)
+
+            txn.execute(sql, ev_map.keys())
+            rows = self.cursor_to_dict(txn)
+            for row in rows:
+                event = ev_map[row["event_id"]]
+                if not row["rejects"] and not row["redacts"]:
+                    to_prefill.append(_EventCacheEntry(
+                        event=event,
+                        redacted_event=None,
+                    ))
+
+        def prefill():
+            for cache_entry in to_prefill:
+                self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+        txn.call_after(prefill)
+
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         txn.call_after(self._invalidate_get_event_cache, event.redacts)
@@ -741,100 +923,65 @@ class EventsStore(SQLBaseStore):
         event_id_list = event_ids
         event_ids = set(event_ids)
 
-        event_map = self._get_events_from_cache(
+        event_entry_map = self._get_events_from_cache(
             event_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
             allow_rejected=allow_rejected,
         )
 
-        missing_events_ids = [e for e in event_ids if e not in event_map]
+        missing_events_ids = [e for e in event_ids if e not in event_entry_map]
 
         if missing_events_ids:
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
                 check_redacted=check_redacted,
-                get_prev_content=get_prev_content,
                 allow_rejected=allow_rejected,
             )
 
-            event_map.update(missing_events)
-
-        defer.returnValue([
-            event_map[e_id] for e_id in event_id_list
-            if e_id in event_map and event_map[e_id]
-        ])
+            event_entry_map.update(missing_events)
 
-    def _get_events_txn(self, txn, event_ids, check_redacted=True,
-                        get_prev_content=False, allow_rejected=False):
-        if not event_ids:
-            return []
-
-        event_map = self._get_events_from_cache(
-            event_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
-
-        missing_events_ids = [e for e in event_ids if e not in event_map]
+        events = []
+        for event_id in event_id_list:
+            entry = event_entry_map.get(event_id, None)
+            if not entry:
+                continue
 
-        if not missing_events_ids:
-            return [
-                event_map[e_id] for e_id in event_ids
-                if e_id in event_map and event_map[e_id]
-            ]
+            if allow_rejected or not entry.event.rejected_reason:
+                if check_redacted and entry.redacted_event:
+                    event = entry.redacted_event
+                else:
+                    event = entry.event
 
-        missing_events = self._fetch_events_txn(
-            txn,
-            missing_events_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
+                events.append(event)
 
-        event_map.update(missing_events)
+                if get_prev_content:
+                    if "replaces_state" in event.unsigned:
+                        prev = yield self.get_event(
+                            event.unsigned["replaces_state"],
+                            get_prev_content=False,
+                            allow_none=True,
+                        )
+                        if prev:
+                            event.unsigned = dict(event.unsigned)
+                            event.unsigned["prev_content"] = prev.content
+                            event.unsigned["prev_sender"] = prev.sender
 
-        return [
-            event_map[e_id] for e_id in event_ids
-            if e_id in event_map and event_map[e_id]
-        ]
+        defer.returnValue(events)
 
     def _invalidate_get_event_cache(self, event_id):
-        for check_redacted in (False, True):
-            for get_prev_content in (False, True):
-                self._get_event_cache.invalidate(
-                    (event_id, check_redacted, get_prev_content)
-                )
-
-    def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=False, allow_rejected=False):
+            self._get_event_cache.invalidate((event_id,))
 
-        events = self._get_events_txn(
-            txn, [event_id],
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
-
-        return events[0] if events else None
-
-    def _get_events_from_cache(self, events, check_redacted, get_prev_content,
-                               allow_rejected):
+    def _get_events_from_cache(self, events, allow_rejected):
         event_map = {}
 
         for event_id in events:
-            try:
-                ret = self._get_event_cache.get(
-                    (event_id, check_redacted, get_prev_content,)
-                )
+            ret = self._get_event_cache.get((event_id,), None)
+            if not ret:
+                continue
 
-                if allow_rejected or not ret.rejected_reason:
-                    event_map[event_id] = ret
-                else:
-                    event_map[event_id] = None
-            except KeyError:
-                pass
+            if allow_rejected or not ret.event.rejected_reason:
+                event_map[event_id] = ret
+            else:
+                event_map[event_id] = None
 
         return event_map
 
@@ -905,8 +1052,7 @@ class EventsStore(SQLBaseStore):
                         reactor.callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
-    def _enqueue_events(self, events, check_redacted=True,
-                        get_prev_content=False, allow_rejected=False):
+    def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
         """Fetches events from the database using the _event_fetch_list. This
         allows batch and bulk fetching of events - it allows us to fetch events
         without having to create a new transaction for each request for events.
@@ -944,8 +1090,6 @@ class EventsStore(SQLBaseStore):
             [
                 preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
-                    check_redacted=check_redacted,
-                    get_prev_content=get_prev_content,
                     rejected_reason=row["rejects"],
                 )
                 for row in rows
@@ -954,7 +1098,7 @@ class EventsStore(SQLBaseStore):
         )
 
         defer.returnValue({
-            e.event_id: e
+            e.event.event_id: e
             for e in res if e
         })
 
@@ -984,37 +1128,8 @@ class EventsStore(SQLBaseStore):
 
         return rows
 
-    def _fetch_events_txn(self, txn, events, check_redacted=True,
-                          get_prev_content=False, allow_rejected=False):
-        if not events:
-            return {}
-
-        rows = self._fetch_event_rows(
-            txn, events,
-        )
-
-        if not allow_rejected:
-            rows[:] = [r for r in rows if not r["rejects"]]
-
-        res = [
-            self._get_event_from_row_txn(
-                txn,
-                row["internal_metadata"], row["json"], row["redacts"],
-                check_redacted=check_redacted,
-                get_prev_content=get_prev_content,
-                rejected_reason=row["rejects"],
-            )
-            for row in rows
-        ]
-
-        return {
-            r.event_id: r
-            for r in res
-        }
-
     @defer.inlineCallbacks
     def _get_event_from_row(self, internal_metadata, js, redacted,
-                            check_redacted=True, get_prev_content=False,
                             rejected_reason=None):
         d = json.loads(js)
         internal_metadata = json.loads(internal_metadata)
@@ -1024,26 +1139,27 @@ class EventsStore(SQLBaseStore):
                 table="rejections",
                 keyvalues={"event_id": rejected_reason},
                 retcol="reason",
-                desc="_get_event_from_row",
+                desc="_get_event_from_row_rejected_reason",
             )
 
-        ev = FrozenEvent(
+        original_ev = FrozenEvent(
             d,
             internal_metadata_dict=internal_metadata,
             rejected_reason=rejected_reason,
         )
 
-        if check_redacted and redacted:
-            ev = prune_event(ev)
+        redacted_event = None
+        if redacted:
+            redacted_event = prune_event(original_ev)
 
             redaction_id = yield self._simple_select_one_onecol(
                 table="redactions",
-                keyvalues={"redacts": ev.event_id},
+                keyvalues={"redacts": redacted_event.event_id},
                 retcol="event_id",
-                desc="_get_event_from_row",
+                desc="_get_event_from_row_redactions",
             )
 
-            ev.unsigned["redacted_by"] = redaction_id
+            redacted_event.unsigned["redacted_by"] = redaction_id
             # Get the redaction event.
 
             because = yield self.get_event(
@@ -1055,86 +1171,16 @@ class EventsStore(SQLBaseStore):
             if because:
                 # It's fine to do add the event directly, since get_pdu_json
                 # will serialise this field correctly
-                ev.unsigned["redacted_because"] = because
-
-        if get_prev_content and "replaces_state" in ev.unsigned:
-            prev = yield self.get_event(
-                ev.unsigned["replaces_state"],
-                get_prev_content=False,
-                allow_none=True,
-            )
-            if prev:
-                ev.unsigned["prev_content"] = prev.content
-                ev.unsigned["prev_sender"] = prev.sender
-
-        self._get_event_cache.prefill(
-            (ev.event_id, check_redacted, get_prev_content), ev
-        )
-
-        defer.returnValue(ev)
+                redacted_event.unsigned["redacted_because"] = because
 
-    def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
-                                check_redacted=True, get_prev_content=False,
-                                rejected_reason=None):
-        d = json.loads(js)
-        internal_metadata = json.loads(internal_metadata)
-
-        if rejected_reason:
-            rejected_reason = self._simple_select_one_onecol_txn(
-                txn,
-                table="rejections",
-                keyvalues={"event_id": rejected_reason},
-                retcol="reason",
-            )
-
-        ev = FrozenEvent(
-            d,
-            internal_metadata_dict=internal_metadata,
-            rejected_reason=rejected_reason,
+        cache_entry = _EventCacheEntry(
+            event=original_ev,
+            redacted_event=redacted_event,
         )
 
-        if check_redacted and redacted:
-            ev = prune_event(ev)
-
-            redaction_id = self._simple_select_one_onecol_txn(
-                txn,
-                table="redactions",
-                keyvalues={"redacts": ev.event_id},
-                retcol="event_id",
-            )
+        self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
-            ev.unsigned["redacted_by"] = redaction_id
-            # Get the redaction event.
-
-            because = self._get_event_txn(
-                txn,
-                redaction_id,
-                check_redacted=False
-            )
-
-            if because:
-                ev.unsigned["redacted_because"] = because
-
-        if get_prev_content and "replaces_state" in ev.unsigned:
-            prev = self._get_event_txn(
-                txn,
-                ev.unsigned["replaces_state"],
-                get_prev_content=False,
-            )
-            if prev:
-                ev.unsigned["prev_content"] = prev.content
-                ev.unsigned["prev_sender"] = prev.sender
-
-        self._get_event_cache.prefill(
-            (ev.event_id, check_redacted, get_prev_content), ev
-        )
-
-        return ev
-
-    def _parse_events_txn(self, txn, rows):
-        event_ids = [r["event_id"] for r in rows]
-
-        return self._get_events_txn(txn, event_ids)
+        defer.returnValue(cache_entry)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
@@ -1208,6 +1254,78 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
+    def _background_reindex_fields_sender(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id, json FROM events"
+                " INNER JOIN event_json USING (event_id)"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+
+            update_rows = []
+            for row in rows:
+                try:
+                    event_id = row[1]
+                    event_json = json.loads(row[2])
+                    sender = event_json["sender"]
+                    content = event_json["content"]
+
+                    contains_url = "url" in content
+                    if contains_url:
+                        contains_url &= isinstance(content["url"], basestring)
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                update_rows.append((sender, contains_url, event_id))
+
+            sql = (
+                "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
+            )
+
+            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+                clump = update_rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
     def _background_reindex_origin_server_ts(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -1374,6 +1492,162 @@ class EventsStore(SQLBaseStore):
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
+    def delete_old_state(self, room_id, topological_ordering):
+        return self.runInteraction(
+            "delete_old_state",
+            self._delete_old_state_txn, room_id, topological_ordering
+        )
+
+    def _delete_old_state_txn(self, txn, room_id, topological_ordering):
+        """Deletes old room state
+        """
+
+        # Tables that should be pruned:
+        #     event_auth
+        #     event_backward_extremities
+        #     event_content_hashes
+        #     event_destinations
+        #     event_edge_hashes
+        #     event_edges
+        #     event_forward_extremities
+        #     event_json
+        #     event_push_actions
+        #     event_reference_hashes
+        #     event_search
+        #     event_signatures
+        #     event_to_state_groups
+        #     events
+        #     rejections
+        #     room_depth
+        #     state_groups
+        #     state_groups_state
+
+        # First ensure that we're not about to delete all the forward extremeties
+        txn.execute(
+            "SELECT e.event_id, e.depth FROM events as e "
+            "INNER JOIN event_forward_extremities as f "
+            "ON e.event_id = f.event_id "
+            "AND e.room_id = f.room_id "
+            "WHERE f.room_id = ?",
+            (room_id,)
+        )
+        rows = txn.fetchall()
+        max_depth = max(row[0] for row in rows)
+
+        if max_depth <= topological_ordering:
+            # We need to ensure we don't delete all the events from the datanase
+            # otherwise we wouldn't be able to send any events (due to not
+            # having any backwards extremeties)
+            raise SynapseError(
+                400, "topological_ordering is greater than forward extremeties"
+            )
+
+        txn.execute(
+            "SELECT event_id, state_key FROM events"
+            " LEFT JOIN state_events USING (room_id, event_id)"
+            " WHERE room_id = ? AND topological_ordering < ?",
+            (room_id, topological_ordering,)
+        )
+        event_rows = txn.fetchall()
+
+        # We calculate the new entries for the backward extremeties by finding
+        # all events that point to events that are to be purged
+        txn.execute(
+            "SELECT DISTINCT e.event_id FROM events as e"
+            " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
+            " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
+            " WHERE e.room_id = ? AND e.topological_ordering < ?"
+            " AND e2.topological_ordering >= ?",
+            (room_id, topological_ordering, topological_ordering)
+        )
+        new_backwards_extrems = txn.fetchall()
+
+        txn.execute(
+            "DELETE FROM event_backward_extremities WHERE room_id = ?",
+            (room_id,)
+        )
+
+        # Update backward extremeties
+        txn.executemany(
+            "INSERT INTO event_backward_extremities (room_id, event_id)"
+            " VALUES (?, ?)",
+            [
+                (room_id, event_id) for event_id, in new_backwards_extrems
+            ]
+        )
+
+        # Get all state groups that are only referenced by events that are
+        # to be deleted.
+        txn.execute(
+            "SELECT state_group FROM event_to_state_groups"
+            " INNER JOIN events USING (event_id)"
+            " WHERE state_group IN ("
+            "   SELECT DISTINCT state_group FROM events"
+            "   INNER JOIN event_to_state_groups USING (event_id)"
+            "   WHERE room_id = ? AND topological_ordering < ?"
+            " )"
+            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+            (room_id, topological_ordering, topological_ordering)
+        )
+        state_rows = txn.fetchall()
+        txn.executemany(
+            "DELETE FROM state_groups_state WHERE state_group = ?",
+            state_rows
+        )
+        txn.executemany(
+            "DELETE FROM state_groups WHERE id = ?",
+            state_rows
+        )
+        # Delete all non-state
+        txn.executemany(
+            "DELETE FROM event_to_state_groups WHERE event_id = ?",
+            [(event_id,) for event_id, _ in event_rows]
+        )
+
+        txn.execute(
+            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+            (topological_ordering, room_id,)
+        )
+
+        # Delete all remote non-state events
+        to_delete = [
+            (event_id,) for event_id, state_key in event_rows
+            if state_key is None and not self.hs.is_mine_id(event_id)
+        ]
+        for table in (
+            "events",
+            "event_json",
+            "event_auth",
+            "event_content_hashes",
+            "event_destinations",
+            "event_edge_hashes",
+            "event_edges",
+            "event_forward_extremities",
+            "event_push_actions",
+            "event_reference_hashes",
+            "event_search",
+            "event_signatures",
+            "rejections",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE event_id = ?" % (table,),
+                to_delete
+            )
+
+        txn.executemany(
+            "DELETE FROM events WHERE event_id = ?",
+            to_delete
+        )
+        # Mark all state and own events as outliers
+        txn.executemany(
+            "UPDATE events SET outlier = ?"
+            " WHERE event_id = ?",
+            [
+                (True, event_id,) for event_id, state_key in event_rows
+                if state_key is not None or self.hs.is_mine_id(event_id)
+            ]
+        )
+
 
 AllNewEventsResult = namedtuple("AllNewEventsResult", [
     "new_forward_events", "new_backfill_events",