summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py518
1 files changed, 439 insertions, 79 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6d978ffcd5..d2feee8dbb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,9 +23,11 @@ from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
+from functools import wraps
 
 import synapse
 import synapse.metrics
@@ -149,8 +151,29 @@ class _EventPeristenceQueue(object):
 _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
+def _retry_on_integrity_error(func):
+    """Wraps a database function so that it gets retried on IntegrityError,
+    with `delete_existing=True` passed in.
+
+    Args:
+        func: function that returns a Deferred and accepts a `delete_existing` arg
+    """
+    @wraps(func)
+    @defer.inlineCallbacks
+    def f(self, *args, **kwargs):
+        try:
+            res = yield func(self, *args, **kwargs)
+        except self.database_engine.module.IntegrityError:
+            logger.exception("IntegrityError, retrying.")
+            res = yield func(self, *args, delete_existing=True, **kwargs)
+        defer.returnValue(res)
+
+    return f
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
     def __init__(self, hs):
         super(EventsStore, self).__init__(hs)
@@ -158,6 +181,10 @@ class EventsStore(SQLBaseStore):
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
+        self.register_background_update_handler(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+            self._background_reindex_fields_sender,
+        )
 
         self._event_persist_queue = _EventPeristenceQueue()
 
@@ -223,8 +250,10 @@ class EventsStore(SQLBaseStore):
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
-    def _persist_events(self, events_and_contexts, backfilled=False):
+    def _persist_events(self, events_and_contexts, backfilled=False,
+                        delete_existing=False):
         if not events_and_contexts:
             return
 
@@ -267,12 +296,15 @@ class EventsStore(SQLBaseStore):
                         self._persist_events_txn,
                         events_and_contexts=chunk,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
                     persist_event_counter.inc_by(len(chunk))
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
     @log_function
-    def _persist_event(self, event, context, current_state=None, backfilled=False):
+    def _persist_event(self, event, context, current_state=None, backfilled=False,
+                       delete_existing=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -285,6 +317,7 @@ class EventsStore(SQLBaseStore):
                         context=context,
                         current_state=current_state,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
                     persist_event_counter.inc()
         except _RollbackButIsFineException:
@@ -317,7 +350,7 @@ class EventsStore(SQLBaseStore):
         )
 
         if not events and not allow_none:
-            raise RuntimeError("Could not find event %s" % (event_id,))
+            raise SynapseError(404, "Could not find event %s" % (event_id,))
 
         defer.returnValue(events[0] if events else None)
 
@@ -347,7 +380,8 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
+    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+                           delete_existing=False):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -355,7 +389,6 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
@@ -388,10 +421,38 @@ class EventsStore(SQLBaseStore):
             txn,
             [(event, context)],
             backfilled=backfilled,
+            delete_existing=delete_existing,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+                            delete_existing=False):
+        """Insert some number of room events into the necessary database tables.
+
+        Rejected events are only inserted into the events table, the events_json table,
+        and the rejections table. Things reading from those table will need to check
+        whether the event was rejected.
+
+        If delete_existing is True then existing events will be purged from the
+        database before insertion. This is useful when retrying due to IntegrityError.
+        """
+        # Ensure that we don't have the same event twice.
+        # Pick the earliest non-outlier if there is one, else the earliest one.
+        new_events_and_contexts = OrderedDict()
+        for event, context in events_and_contexts:
+            prev_event_context = new_events_and_contexts.get(event.event_id)
+            if prev_event_context:
+                if not event.internal_metadata.is_outlier():
+                    if prev_event_context[0].internal_metadata.is_outlier():
+                        # To ensure correct ordering we pop, as OrderedDict is
+                        # ordered by first insertion.
+                        new_events_and_contexts.pop(event.event_id, None)
+                        new_events_and_contexts[event.event_id] = (event, context)
+            else:
+                new_events_and_contexts[event.event_id] = (event, context)
+
+        events_and_contexts = new_events_and_contexts.values()
+
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -402,21 +463,11 @@ class EventsStore(SQLBaseStore):
                     event.room_id, event.internal_metadata.stream_ordering,
                 )
 
-            if not event.internal_metadata.is_outlier():
+            if not event.internal_metadata.is_outlier() and not context.rejected:
                 depth_updates[event.room_id] = max(
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-            if context.push_actions:
-                self._set_push_actions_for_event_and_users_txn(
-                    txn, event, context.push_actions
-                )
-
-        if event.type == EventTypes.Redaction and event.redacts is not None:
-            self._remove_push_actions_for_event_id_txn(
-                txn, event.room_id, event.redacts
-            )
-
         for room_id, depth in depth_updates.items():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
@@ -426,30 +477,21 @@ class EventsStore(SQLBaseStore):
             ),
             [event.event_id for event, _ in events_and_contexts]
         )
+
         have_persisted = {
             event_id: outlier
             for event_id, outlier in txn.fetchall()
         }
 
-        event_map = {}
         to_remove = set()
         for event, context in events_and_contexts:
-            # Handle the case of the list including the same event multiple
-            # times. The tricky thing here is when they differ by whether
-            # they are an outlier.
-            if event.event_id in event_map:
-                other = event_map[event.event_id]
-
-                if not other.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                elif not event.internal_metadata.is_outlier():
+            if context.rejected:
+                # If the event is rejected then we don't care if the event
+                # was an outlier or not.
+                if event.event_id in have_persisted:
+                    # If we have already seen the event then ignore it.
                     to_remove.add(event)
-                    continue
-                else:
-                    to_remove.add(other)
-
-            event_map[event.event_id] = event
+                continue
 
             if event.event_id not in have_persisted:
                 continue
@@ -458,6 +500,12 @@ class EventsStore(SQLBaseStore):
 
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
+                # We received a copy of an event that we had already stored as
+                # an outlier in the database. We now have some state at that
+                # so we need to update the state_groups table with that state.
+
+                # insert into the state_group, state_groups_state and
+                # event_to_state_groups tables.
                 self._store_mult_state_groups_txn(txn, ((event, context),))
 
                 metadata_json = encode_json(
@@ -473,6 +521,8 @@ class EventsStore(SQLBaseStore):
                     (metadata_json, event.event_id,)
                 )
 
+                # Add an entry to the ex_outlier_stream table to replicate the
+                # change in outlier status to our workers.
                 stream_order = event.internal_metadata.stream_ordering
                 state_group_id = context.state_group or context.new_state_group_id
                 self._simple_insert_txn(
@@ -494,6 +544,8 @@ class EventsStore(SQLBaseStore):
                     (False, event.event_id,)
                 )
 
+                # Update the event_backward_extremities table now that this
+                # event isn't an outlier any more.
                 self._update_extremeties(txn, [event])
 
         events_and_contexts = [
@@ -501,38 +553,12 @@ class EventsStore(SQLBaseStore):
         ]
 
         if not events_and_contexts:
+            # Make sure we don't pass an empty list to functions that expect to
+            # be storing at least one element.
             return
 
-        self._store_mult_state_groups_txn(txn, events_and_contexts)
-
-        self._handle_mult_prev_events(
-            txn,
-            events=[event for event, _ in events_and_contexts],
-        )
-
-        for event, _ in events_and_contexts:
-            if event.type == EventTypes.Name:
-                self._store_room_name_txn(txn, event)
-            elif event.type == EventTypes.Topic:
-                self._store_room_topic_txn(txn, event)
-            elif event.type == EventTypes.Message:
-                self._store_room_message_txn(txn, event)
-            elif event.type == EventTypes.Redaction:
-                self._store_redaction(txn, event)
-            elif event.type == EventTypes.RoomHistoryVisibility:
-                self._store_history_visibility_txn(txn, event)
-            elif event.type == EventTypes.GuestAccess:
-                self._store_guest_access_txn(txn, event)
-
-        self._store_room_members_txn(
-            txn,
-            [
-                event
-                for event, _ in events_and_contexts
-                if event.type == EventTypes.Member
-            ],
-            backfilled=backfilled,
-        )
+        # From this point onwards the events are only events that we haven't
+        # seen before.
 
         def event_dict(event):
             return {
@@ -544,6 +570,43 @@ class EventsStore(SQLBaseStore):
                 ]
             }
 
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+
+            logger.info("Deleting existing")
+
+            for table in (
+                "events",
+                "event_auth",
+                "event_json",
+                "event_content_hashes",
+                "event_destinations",
+                "event_edge_hashes",
+                "event_edges",
+                "event_forward_extremities",
+                "event_push_actions",
+                "event_reference_hashes",
+                "event_search",
+                "event_signatures",
+                "event_to_state_groups",
+                "guest_access",
+                "history_visibility",
+                "local_invites",
+                "room_names",
+                "state_events",
+                "rejections",
+                "redactions",
+                "room_memberships",
+                "state_events"
+            ):
+                txn.executemany(
+                    "DELETE FROM %s WHERE event_id = ?" % (table,),
+                    [(ev.event_id,) for ev, _ in events_and_contexts]
+                )
+
         self._simple_insert_many_txn(
             txn,
             table="event_json",
@@ -576,15 +639,51 @@ class EventsStore(SQLBaseStore):
                     "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
                     "received_ts": self._clock.time_msec(),
+                    "sender": event.sender,
+                    "contains_url": (
+                        "url" in event.content
+                        and isinstance(event.content["url"], basestring)
+                    ),
                 }
                 for event, _ in events_and_contexts
             ],
         )
 
-        if context.rejected:
-            self._store_rejections_txn(
-                txn, event.event_id, context.rejected
-            )
+        # Remove the rejected events from the list now that we've added them
+        # to the events table and the events_json table.
+        to_remove = set()
+        for event, context in events_and_contexts:
+            if context.rejected:
+                # Insert the event_id into the rejections table
+                self._store_rejections_txn(
+                    txn, event.event_id, context.rejected
+                )
+                to_remove.add(event)
+
+        events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0] not in to_remove
+        ]
+
+        if not events_and_contexts:
+            # Make sure we don't pass an empty list to functions that expect to
+            # be storing at least one element.
+            return
+
+        # From this point onwards the events are only ones that weren't rejected.
+
+        for event, context in events_and_contexts:
+            # Insert all the push actions into the event_push_actions table.
+            if context.push_actions:
+                self._set_push_actions_for_event_and_users_txn(
+                    txn, event, context.push_actions
+                )
+
+            if event.type == EventTypes.Redaction and event.redacts is not None:
+                # Remove the entries in the event_push_actions table for the
+                # redacted event.
+                self._remove_push_actions_for_event_id_txn(
+                    txn, event.room_id, event.redacts
+                )
 
         self._simple_insert_many_txn(
             txn,
@@ -600,6 +699,49 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+        # Insert into the state_groups, state_groups_state, and
+        # event_to_state_groups tables.
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+        # Update the event_forward_extremities, event_backward_extremities and
+        # event_edges tables.
+        self._handle_mult_prev_events(
+            txn,
+            events=[event for event, _ in events_and_contexts],
+        )
+
+        for event, _ in events_and_contexts:
+            if event.type == EventTypes.Name:
+                # Insert into the room_names and event_search tables.
+                self._store_room_name_txn(txn, event)
+            elif event.type == EventTypes.Topic:
+                # Insert into the topics table and event_search table.
+                self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Message:
+                # Insert into the event_search table.
+                self._store_room_message_txn(txn, event)
+            elif event.type == EventTypes.Redaction:
+                # Insert into the redactions table.
+                self._store_redaction(txn, event)
+            elif event.type == EventTypes.RoomHistoryVisibility:
+                # Insert into the event_search table.
+                self._store_history_visibility_txn(txn, event)
+            elif event.type == EventTypes.GuestAccess:
+                # Insert into the event_search table.
+                self._store_guest_access_txn(txn, event)
+
+        # Insert into the room_memberships table.
+        self._store_room_members_txn(
+            txn,
+            [
+                event
+                for event, _ in events_and_contexts
+                if event.type == EventTypes.Member
+            ],
+            backfilled=backfilled,
+        )
+
+        # Insert event_reference_hashes table.
         self._store_event_reference_hashes_txn(
             txn, [event for event, _ in events_and_contexts]
         )
@@ -644,6 +786,7 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+        # Prefill the event cache
         self._add_to_cache(txn, events_and_contexts)
 
         if backfilled:
@@ -656,22 +799,11 @@ class EventsStore(SQLBaseStore):
                 # Outlier events shouldn't clobber the current state.
                 continue
 
-            if context.rejected:
-                # If the event failed it's auth checks then it shouldn't
-                # clobbler the current state.
-                continue
-
             txn.call_after(
                 self._get_current_state_for_key.invalidate,
                 (event.room_id, event.type, event.state_key,)
             )
 
-            if event.type in [EventTypes.Name, EventTypes.Aliases]:
-                txn.call_after(
-                    self.get_room_name_and_aliases.invalidate,
-                    (event.room_id,)
-                )
-
             self._simple_upsert_txn(
                 txn,
                 "current_state_events",
@@ -1122,6 +1254,78 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
+    def _background_reindex_fields_sender(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id, json FROM events"
+                " INNER JOIN event_json USING (event_id)"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+
+            update_rows = []
+            for row in rows:
+                try:
+                    event_id = row[1]
+                    event_json = json.loads(row[2])
+                    sender = event_json["sender"]
+                    content = event_json["content"]
+
+                    contains_url = "url" in content
+                    if contains_url:
+                        contains_url &= isinstance(content["url"], basestring)
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                update_rows.append((sender, contains_url, event_id))
+
+            sql = (
+                "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
+            )
+
+            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+                clump = update_rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
     def _background_reindex_origin_server_ts(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -1288,6 +1492,162 @@ class EventsStore(SQLBaseStore):
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
+    def delete_old_state(self, room_id, topological_ordering):
+        return self.runInteraction(
+            "delete_old_state",
+            self._delete_old_state_txn, room_id, topological_ordering
+        )
+
+    def _delete_old_state_txn(self, txn, room_id, topological_ordering):
+        """Deletes old room state
+        """
+
+        # Tables that should be pruned:
+        #     event_auth
+        #     event_backward_extremities
+        #     event_content_hashes
+        #     event_destinations
+        #     event_edge_hashes
+        #     event_edges
+        #     event_forward_extremities
+        #     event_json
+        #     event_push_actions
+        #     event_reference_hashes
+        #     event_search
+        #     event_signatures
+        #     event_to_state_groups
+        #     events
+        #     rejections
+        #     room_depth
+        #     state_groups
+        #     state_groups_state
+
+        # First ensure that we're not about to delete all the forward extremeties
+        txn.execute(
+            "SELECT e.event_id, e.depth FROM events as e "
+            "INNER JOIN event_forward_extremities as f "
+            "ON e.event_id = f.event_id "
+            "AND e.room_id = f.room_id "
+            "WHERE f.room_id = ?",
+            (room_id,)
+        )
+        rows = txn.fetchall()
+        max_depth = max(row[0] for row in rows)
+
+        if max_depth <= topological_ordering:
+            # We need to ensure we don't delete all the events from the datanase
+            # otherwise we wouldn't be able to send any events (due to not
+            # having any backwards extremeties)
+            raise SynapseError(
+                400, "topological_ordering is greater than forward extremeties"
+            )
+
+        txn.execute(
+            "SELECT event_id, state_key FROM events"
+            " LEFT JOIN state_events USING (room_id, event_id)"
+            " WHERE room_id = ? AND topological_ordering < ?",
+            (room_id, topological_ordering,)
+        )
+        event_rows = txn.fetchall()
+
+        # We calculate the new entries for the backward extremeties by finding
+        # all events that point to events that are to be purged
+        txn.execute(
+            "SELECT DISTINCT e.event_id FROM events as e"
+            " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
+            " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
+            " WHERE e.room_id = ? AND e.topological_ordering < ?"
+            " AND e2.topological_ordering >= ?",
+            (room_id, topological_ordering, topological_ordering)
+        )
+        new_backwards_extrems = txn.fetchall()
+
+        txn.execute(
+            "DELETE FROM event_backward_extremities WHERE room_id = ?",
+            (room_id,)
+        )
+
+        # Update backward extremeties
+        txn.executemany(
+            "INSERT INTO event_backward_extremities (room_id, event_id)"
+            " VALUES (?, ?)",
+            [
+                (room_id, event_id) for event_id, in new_backwards_extrems
+            ]
+        )
+
+        # Get all state groups that are only referenced by events that are
+        # to be deleted.
+        txn.execute(
+            "SELECT state_group FROM event_to_state_groups"
+            " INNER JOIN events USING (event_id)"
+            " WHERE state_group IN ("
+            "   SELECT DISTINCT state_group FROM events"
+            "   INNER JOIN event_to_state_groups USING (event_id)"
+            "   WHERE room_id = ? AND topological_ordering < ?"
+            " )"
+            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+            (room_id, topological_ordering, topological_ordering)
+        )
+        state_rows = txn.fetchall()
+        txn.executemany(
+            "DELETE FROM state_groups_state WHERE state_group = ?",
+            state_rows
+        )
+        txn.executemany(
+            "DELETE FROM state_groups WHERE id = ?",
+            state_rows
+        )
+        # Delete all non-state
+        txn.executemany(
+            "DELETE FROM event_to_state_groups WHERE event_id = ?",
+            [(event_id,) for event_id, _ in event_rows]
+        )
+
+        txn.execute(
+            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+            (topological_ordering, room_id,)
+        )
+
+        # Delete all remote non-state events
+        to_delete = [
+            (event_id,) for event_id, state_key in event_rows
+            if state_key is None and not self.hs.is_mine_id(event_id)
+        ]
+        for table in (
+            "events",
+            "event_json",
+            "event_auth",
+            "event_content_hashes",
+            "event_destinations",
+            "event_edge_hashes",
+            "event_edges",
+            "event_forward_extremities",
+            "event_push_actions",
+            "event_reference_hashes",
+            "event_search",
+            "event_signatures",
+            "rejections",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE event_id = ?" % (table,),
+                to_delete
+            )
+
+        txn.executemany(
+            "DELETE FROM events WHERE event_id = ?",
+            to_delete
+        )
+        # Mark all state and own events as outliers
+        txn.executemany(
+            "UPDATE events SET outlier = ?"
+            " WHERE event_id = ?",
+            [
+                (True, event_id,) for event_id, state_key in event_rows
+                if state_key is not None or self.hs.is_mine_id(event_id)
+            ]
+        )
+
 
 AllNewEventsResult = namedtuple("AllNewEventsResult", [
     "new_forward_events", "new_backfill_events",