diff options
author | Erik Johnston <erik@matrix.org> | 2018-05-18 15:28:22 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-05-25 10:54:23 +0100 |
commit | 13dbcafb9b47e9be312174f32ae236aafe2e8041 (patch) | |
tree | 75ee17e5809283a51edcde7e8969a316f71bc964 | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into erikj/room_chunks (diff) | |
download | synapse-13dbcafb9b47e9be312174f32ae236aafe2e8041.tar.xz |
Compute new chunks for new events
We also calculate a consistent topological ordering within a chunk, but it isn't used yet.
-rw-r--r-- | synapse/storage/__init__.py | 1 | ||||
-rw-r--r-- | synapse/storage/events.py | 208 |
2 files changed, 203 insertions, 6 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 979fa22438..c443f7d2b6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore, self._group_updates_id_gen = StreamIdGenerator( db_conn, "local_group_updates", "stream_id", ) + self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id") if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3cb1abd2ab..6db390485a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,6 +23,7 @@ import simplejson as json from twisted.internet import defer from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore from synapse.util.async import ObservableDeferred from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import ( @@ -1019,13 +1020,19 @@ class EventsStore(EventsWorkerStore): } ) - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" + chunk_id, _ = self._compute_chunk_id_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], ) - txn.execute( - sql, - (False, event.event_id,) + + self._simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event.event_id}, + updatevalues={ + "outlier": False, + "chunk_id": chunk_id, + }, ) # Update the event_backward_extremities table now that this @@ -1108,12 +1115,21 @@ class EventsStore(EventsWorkerStore): ], ) + if event.internal_metadata.is_outlier(): + chunk_id, _topo = None, 0 + else: + chunk_id, _topo = self._compute_chunk_id_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], + ) + self._simple_insert_many_txn( txn, table="events", values=[ { "stream_ordering": event.internal_metadata.stream_ordering, + "chunk_id": chunk_id, "topological_ordering": event.depth, "depth": event.depth, "event_id": event.event_id, @@ -1344,6 +1360,186 @@ class EventsStore(EventsWorkerStore): (event.event_id, event.redacts) ) + def _compute_chunk_id_txn(self, txn, room_id, event_id, prev_event_ids): + """Computes the chunk ID and topological ordering for an event. + + Also handles updating chunk_graph table. + + Args: + txn, + room_id (str) + event_id (str) + prev_event_ids (list[str]) + + Returns: + tuple[int, int]: Returns the chunk_id, topological_ordering for + the event + """ + + # We calculate the chunk for an event using the following rules: + # + # 1. If all prev events have the same chunk ID then use that chunk ID + # 2. If we have none of the prev events but do have events pointing to + # it, then we use their chunk ID if: + # - They’re all in the same chunk, and + # - All their prev events match the events being inserted + # 3. Otherwise, create a new chunk and use that + + # Set of chunks that the event refers to. Includes None if there were + # prev events that we don't have (or don't have a chunk for) + prev_chunk_ids = set() + + for eid in prev_event_ids: + chunk_id = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={"event_id": eid}, + retcol="chunk_id", + allow_none=True, + ) + + prev_chunk_ids.add(chunk_id) + + forward_events = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "prev_event_id": event_id, + "is_state": False, + }, + retcol="event_id", + ) + + # Set of chunks that refer to this event. + forward_chunk_ids = set() + + # Set of event_ids of all prev_events of those in `forward_events`. This + # is guaranteed to contain at least the given event_id. + sibling_events = set() + for eid in set(forward_events): + chunk_id = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={"event_id": eid}, + retcol="chunk_id", + allow_none=True, + ) + + if chunk_id is not None: + # chunk_id can be None if it's an outlier + forward_chunk_ids.add(chunk_id) + + pes = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "event_id": eid, + "is_state": False, + }, + retcol="prev_event_id", + ) + + sibling_events.update(pes) + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + # If there is only one previous chunk (and that isn't None), then this + # satisfies condition one. + if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids: + chunk_id = list(prev_chunk_ids)[0] + + # This event is being inserted at the end of the chunk + new_topo = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "room_id": room_id, + "chunk_id": chunk_id, + }, + retcol="COALESCE(MAX(topological_ordering), 0)", + ) + new_topo += 1 + # If there is only one forward chunk and only one sibling event (which + # would be the given event), then this satisfies condition two. + elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1: + chunk_id = list(forward_chunk_ids)[0] + + # This event is being inserted at the start of the chunk + new_topo = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "room_id": room_id, + "chunk_id": chunk_id, + }, + retcol="COALESCE(MIN(topological_ordering), 0)", + ) + new_topo -= 1 + else: + chunk_id = self._chunk_id_gen.get_next() + new_topo = 0 + + # We've generated a new chunk, so we have to tell the + # ChunkDBOrderedListStore about that. + table.add_node(chunk_id) + + # We need to now update the database with any new edges between chunks + current_prev_ids = self._simple_select_onecol_txn( + txn, + table="chunk_graph", + keyvalues={ + "chunk_id": chunk_id, + }, + retcol="prev_id", + ) + + current_forward_ids = self._simple_select_onecol_txn( + txn, + table="chunk_graph", + keyvalues={ + "prev_id": chunk_id, + }, + retcol="chunk_id", + ) + + prev_chunk_ids = set( + pid for pid in prev_chunk_ids + if pid is not None and pid not in current_prev_ids and pid != chunk_id + ) + forward_chunk_ids = set( + fid for fid in forward_chunk_ids + if fid not in current_forward_ids and fid != chunk_id + ) + + if prev_chunk_ids: + for pid in prev_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(pid, chunk_id) + + if forward_chunk_ids: + for fid in forward_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(chunk_id, fid) + + # We now need to update the backwards extremities for the chunks. + + txn.executemany(""" + INSERT INTO chunk_backwards_extremities (chunk_id, event_id) + SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events) + """, [(chunk_id, eid, eid) for eid in prev_event_ids]) + + self._simple_delete_txn( + txn, + table="chunk_backwards_extremities", + keyvalues={"event_id": event_id}, + ) + + return chunk_id, new_topo + @defer.inlineCallbacks def have_events_in_timeline(self, event_ids): """Given a list of event ids, check if we have already processed and |