summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/__init__.py1
-rw-r--r--synapse/storage/events.py215
2 files changed, 201 insertions, 15 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 979fa22438..c443f7d2b6 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore,
         self._group_updates_id_gen = StreamIdGenerator(
             db_conn, "local_group_updates", "stream_id",
         )
+        self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")
 
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = StreamIdGenerator(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3cb1abd2ab..5194c4a48d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,6 +23,7 @@ import simplejson as json
 from twisted.internet import defer
 
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
 from synapse.util.async import ObservableDeferred
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import (
@@ -1019,13 +1020,19 @@ class EventsStore(EventsWorkerStore):
                     }
                 )
 
-                sql = (
-                    "UPDATE events SET outlier = ?"
-                    " WHERE event_id = ?"
+                chunk_id, _ = self._insert_into_chunk_txn(
+                    txn, event.room_id, event.event_id,
+                    [eid for eid, _ in event.prev_events],
                 )
-                txn.execute(
-                    sql,
-                    (False, event.event_id,)
+
+                self._simple_update_txn(
+                    txn,
+                    table="events",
+                    keyvalues={"event_id": event.event_id},
+                    updatevalues={
+                        "outlier": False,
+                        "chunk_id": chunk_id,
+                    },
                 )
 
                 # Update the event_backward_extremities table now that this
@@ -1108,12 +1115,21 @@ class EventsStore(EventsWorkerStore):
             ],
         )
 
-        self._simple_insert_many_txn(
-            txn,
-            table="events",
-            values=[
-                {
+        for event, _ in events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                chunk_id, _topo = None, 0
+            else:
+                chunk_id, _topo = self._insert_into_chunk_txn(
+                    txn, event.room_id, event.event_id,
+                    [eid for eid, _ in event.prev_events],
+                )
+
+            self._simple_insert_txn(
+                txn,
+                table="events",
+                values={
                     "stream_ordering": event.internal_metadata.stream_ordering,
+                    "chunk_id": chunk_id,
                     "topological_ordering": event.depth,
                     "depth": event.depth,
                     "event_id": event.event_id,
@@ -1129,10 +1145,8 @@ class EventsStore(EventsWorkerStore):
                         "url" in event.content
                         and isinstance(event.content["url"], basestring)
                     ),
-                }
-                for event, _ in events_and_contexts
-            ],
-        )
+                },
+            )
 
     def _store_rejected_events_txn(self, txn, events_and_contexts):
         """Add rows to the 'rejections' table for received events which were
@@ -1344,6 +1358,177 @@ class EventsStore(EventsWorkerStore):
             (event.event_id, event.redacts)
         )
 
+    def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids):
+        """Computes the chunk ID and topological ordering for an event and
+        handles updating chunk_graph table.
+
+        Args:
+            txn,
+            room_id (str)
+            event_id (str)
+            prev_event_ids (list[str])
+
+        Returns:
+            tuple[int, int]: Returns the chunk_id, topological_ordering for
+            the event
+        """
+
+        # We calculate the chunk for an event using the following rules:
+        #
+        # 1. If all prev events have the same chunk ID then use that chunk ID
+        # 2. If we have none of the prev events but do have events pointing to
+        #    the event, then we use their chunk ID if:
+        #     - They're all in the same chunk, and
+        #     - All their prev events match the events being inserted
+        # 3. Otherwise, create a new chunk and use that
+
+        # Set of chunks that the event refers to. Includes None if there were
+        # prev events that we don't have (or don't have a chunk for)
+        prev_chunk_ids = set()
+
+        for eid in prev_event_ids:
+            chunk_id = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={"event_id": eid},
+                retcol="chunk_id",
+                allow_none=True,
+            )
+
+            prev_chunk_ids.add(chunk_id)
+
+        forward_events = self._simple_select_onecol_txn(
+            txn,
+            table="event_edges",
+            keyvalues={
+                "prev_event_id": event_id,
+                "is_state": False,
+            },
+            retcol="event_id",
+        )
+
+        # Set of chunks that refer to this event.
+        forward_chunk_ids = set()
+
+        # All the prev_events of events in `forward_events`.
+        # Note that this will include the current event_id.
+        sibling_events = set()
+        for eid in forward_events:
+            chunk_id = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={"event_id": eid},
+                retcol="chunk_id",
+                allow_none=True,
+            )
+
+            if chunk_id is not None:
+                # chunk_id can be None if it's an outlier
+                forward_chunk_ids.add(chunk_id)
+
+            pes = self._simple_select_onecol_txn(
+                txn,
+                table="event_edges",
+                keyvalues={
+                    "event_id": eid,
+                    "is_state": False,
+                },
+                retcol="prev_event_id",
+            )
+
+            sibling_events.update(pes)
+
+        table = ChunkDBOrderedListStore(
+            txn, room_id, self.clock,
+        )
+
+        # If there is only one previous chunk (and that isn't None), then this
+        # satisfies condition one.
+        if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
+            chunk_id = list(prev_chunk_ids)[0]
+
+            # This event is being inserted at the end of the chunk
+            new_topo = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "room_id": room_id,
+                    "chunk_id": chunk_id,
+                },
+                retcol="MAX(topological_ordering)",
+            )
+            new_topo += 1
+
+        # If there is only one forward chunk and only one sibling event (which
+        # would be the given event), then this satisfies condition two.
+        elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
+            chunk_id = list(forward_chunk_ids)[0]
+
+            # This event is being inserted at the start of the chunk
+            new_topo = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "room_id": room_id,
+                    "chunk_id": chunk_id,
+                },
+                retcol="MIN(topological_ordering)",
+            )
+            new_topo -= 1
+        else:
+            chunk_id = self._chunk_id_gen.get_next()
+            new_topo = 0
+
+            # We've generated a new chunk, so we have to tell the
+            # ChunkDBOrderedListStore about that.
+            table.add_node(chunk_id)
+
+        # We need to now update the database with any new edges between chunks
+        current_prev_ids = self._simple_select_onecol_txn(
+            txn,
+            table="chunk_graph",
+            keyvalues={
+                "chunk_id": chunk_id,
+            },
+            retcol="prev_id",
+        )
+
+        current_forward_ids = self._simple_select_onecol_txn(
+            txn,
+            table="chunk_graph",
+            keyvalues={
+                "prev_id": chunk_id,
+            },
+            retcol="chunk_id",
+        )
+
+        for pid in prev_chunk_ids:
+            if pid is not None and pid not in current_prev_ids and pid != chunk_id:
+                # Note that the edge direction is reversed than what you might
+                # expect. See ChunkDBOrderedListStore for more details.
+                table.add_edge(pid, chunk_id)
+
+        for fid in forward_chunk_ids:
+            # Note that the edge direction is reversed than what you might
+            # expect. See ChunkDBOrderedListStore for more details.
+            if fid not in current_forward_ids and fid != chunk_id:
+                table.add_edge(chunk_id, fid)
+
+        # We now need to update the backwards extremities for the chunks.
+
+        txn.executemany("""
+            INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
+            SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events)
+        """, [(chunk_id, eid, eid) for eid in prev_event_ids])
+
+        self._simple_delete_txn(
+            txn,
+            table="chunk_backwards_extremities",
+            keyvalues={"event_id": event_id},
+        )
+
+        return chunk_id, new_topo
+
     @defer.inlineCallbacks
     def have_events_in_timeline(self, event_ids):
         """Given a list of event ids, check if we have already processed and