diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 979fa22438..c443f7d2b6 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)
+ self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3cb1abd2ab..5194c4a48d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,6 +23,7 @@ import simplejson as json
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import (
@@ -1019,13 +1020,19 @@ class EventsStore(EventsWorkerStore):
}
)
- sql = (
- "UPDATE events SET outlier = ?"
- " WHERE event_id = ?"
+ chunk_id, _ = self._insert_into_chunk_txn(
+ txn, event.room_id, event.event_id,
+ [eid for eid, _ in event.prev_events],
)
- txn.execute(
- sql,
- (False, event.event_id,)
+
+ self._simple_update_txn(
+ txn,
+ table="events",
+ keyvalues={"event_id": event.event_id},
+ updatevalues={
+ "outlier": False,
+ "chunk_id": chunk_id,
+ },
)
# Update the event_backward_extremities table now that this
@@ -1108,12 +1115,21 @@ class EventsStore(EventsWorkerStore):
],
)
- self._simple_insert_many_txn(
- txn,
- table="events",
- values=[
- {
+ for event, _ in events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ chunk_id, _topo = None, 0
+ else:
+ chunk_id, _topo = self._insert_into_chunk_txn(
+ txn, event.room_id, event.event_id,
+ [eid for eid, _ in event.prev_events],
+ )
+
+ self._simple_insert_txn(
+ txn,
+ table="events",
+ values={
"stream_ordering": event.internal_metadata.stream_ordering,
+ "chunk_id": chunk_id,
"topological_ordering": event.depth,
"depth": event.depth,
"event_id": event.event_id,
@@ -1129,10 +1145,8 @@ class EventsStore(EventsWorkerStore):
"url" in event.content
and isinstance(event.content["url"], basestring)
),
- }
- for event, _ in events_and_contexts
- ],
- )
+ },
+ )
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
@@ -1344,6 +1358,177 @@ class EventsStore(EventsWorkerStore):
(event.event_id, event.redacts)
)
+ def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids):
+ """Computes the chunk ID and topological ordering for an event and
+ handles updating chunk_graph table.
+
+ Args:
+ txn,
+ room_id (str)
+ event_id (str)
+ prev_event_ids (list[str])
+
+ Returns:
+ tuple[int, int]: Returns the chunk_id, topological_ordering for
+ the event
+ """
+
+ # We calculate the chunk for an event using the following rules:
+ #
+ # 1. If all prev events have the same chunk ID then use that chunk ID
+ # 2. If we have none of the prev events but do have events pointing to
+ # the event, then we use their chunk ID if:
+ # - They're all in the same chunk, and
+ # - All their prev events match the events being inserted
+ # 3. Otherwise, create a new chunk and use that
+
+ # Set of chunks that the event refers to. Includes None if there were
+ # prev events that we don't have (or don't have a chunk for)
+ prev_chunk_ids = set()
+
+ for eid in prev_event_ids:
+ chunk_id = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={"event_id": eid},
+ retcol="chunk_id",
+ allow_none=True,
+ )
+
+ prev_chunk_ids.add(chunk_id)
+
+ forward_events = self._simple_select_onecol_txn(
+ txn,
+ table="event_edges",
+ keyvalues={
+ "prev_event_id": event_id,
+ "is_state": False,
+ },
+ retcol="event_id",
+ )
+
+ # Set of chunks that refer to this event.
+ forward_chunk_ids = set()
+
+ # All the prev_events of events in `forward_events`.
+ # Note that this will include the current event_id.
+ sibling_events = set()
+ for eid in forward_events:
+ chunk_id = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={"event_id": eid},
+ retcol="chunk_id",
+ allow_none=True,
+ )
+
+ if chunk_id is not None:
+ # chunk_id can be None if it's an outlier
+ forward_chunk_ids.add(chunk_id)
+
+ pes = self._simple_select_onecol_txn(
+ txn,
+ table="event_edges",
+ keyvalues={
+ "event_id": eid,
+ "is_state": False,
+ },
+ retcol="prev_event_id",
+ )
+
+ sibling_events.update(pes)
+
+ table = ChunkDBOrderedListStore(
+ txn, room_id, self.clock,
+ )
+
+ # If there is only one previous chunk (and that isn't None), then this
+ # satisfies condition one.
+ if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
+ chunk_id = list(prev_chunk_ids)[0]
+
+ # This event is being inserted at the end of the chunk
+ new_topo = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "room_id": room_id,
+ "chunk_id": chunk_id,
+ },
+ retcol="MAX(topological_ordering)",
+ )
+ new_topo += 1
+
+ # If there is only one forward chunk and only one sibling event (which
+ # would be the given event), then this satisfies condition two.
+ elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
+ chunk_id = list(forward_chunk_ids)[0]
+
+ # This event is being inserted at the start of the chunk
+ new_topo = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "room_id": room_id,
+ "chunk_id": chunk_id,
+ },
+ retcol="MIN(topological_ordering)",
+ )
+ new_topo -= 1
+ else:
+ chunk_id = self._chunk_id_gen.get_next()
+ new_topo = 0
+
+ # We've generated a new chunk, so we have to tell the
+ # ChunkDBOrderedListStore about that.
+ table.add_node(chunk_id)
+
+ # We need to now update the database with any new edges between chunks
+ current_prev_ids = self._simple_select_onecol_txn(
+ txn,
+ table="chunk_graph",
+ keyvalues={
+ "chunk_id": chunk_id,
+ },
+ retcol="prev_id",
+ )
+
+ current_forward_ids = self._simple_select_onecol_txn(
+ txn,
+ table="chunk_graph",
+ keyvalues={
+ "prev_id": chunk_id,
+ },
+ retcol="chunk_id",
+ )
+
+ for pid in prev_chunk_ids:
+ if pid is not None and pid not in current_prev_ids and pid != chunk_id:
+ # Note that the edge direction is reversed than what you might
+ # expect. See ChunkDBOrderedListStore for more details.
+ table.add_edge(pid, chunk_id)
+
+ for fid in forward_chunk_ids:
+ # Note that the edge direction is reversed than what you might
+ # expect. See ChunkDBOrderedListStore for more details.
+ if fid not in current_forward_ids and fid != chunk_id:
+ table.add_edge(chunk_id, fid)
+
+ # We now need to update the backwards extremities for the chunks.
+
+ txn.executemany("""
+ INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
+ SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events)
+ """, [(chunk_id, eid, eid) for eid in prev_event_ids])
+
+ self._simple_delete_txn(
+ txn,
+ table="chunk_backwards_extremities",
+ keyvalues={"event_id": event_id},
+ )
+
+ return chunk_id, new_topo
+
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
|