From 13dbcafb9b47e9be312174f32ae236aafe2e8041 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 May 2018 15:28:22 +0100 Subject: Compute new chunks for new events We also calculate a consistent topological ordering within a chunk, but it isn't used yet. --- synapse/storage/__init__.py | 1 + synapse/storage/events.py | 208 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 203 insertions(+), 6 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 979fa22438..c443f7d2b6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore, self._group_updates_id_gen = StreamIdGenerator( db_conn, "local_group_updates", "stream_id", ) + self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id") if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3cb1abd2ab..6db390485a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,6 +23,7 @@ import simplejson as json from twisted.internet import defer from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore from synapse.util.async import ObservableDeferred from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import ( @@ -1019,13 +1020,19 @@ class EventsStore(EventsWorkerStore): } ) - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" + chunk_id, _ = self._compute_chunk_id_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], ) - txn.execute( - sql, - (False, event.event_id,) + + self._simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event.event_id}, + updatevalues={ + "outlier": False, + "chunk_id": chunk_id, + }, ) # Update the event_backward_extremities table now that this @@ -1108,12 +1115,21 @@ class EventsStore(EventsWorkerStore): ], ) + if event.internal_metadata.is_outlier(): + chunk_id, _topo = None, 0 + else: + chunk_id, _topo = self._compute_chunk_id_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], + ) + self._simple_insert_many_txn( txn, table="events", values=[ { "stream_ordering": event.internal_metadata.stream_ordering, + "chunk_id": chunk_id, "topological_ordering": event.depth, "depth": event.depth, "event_id": event.event_id, @@ -1344,6 +1360,186 @@ class EventsStore(EventsWorkerStore): (event.event_id, event.redacts) ) + def _compute_chunk_id_txn(self, txn, room_id, event_id, prev_event_ids): + """Computes the chunk ID and topological ordering for an event. + + Also handles updating chunk_graph table. + + Args: + txn, + room_id (str) + event_id (str) + prev_event_ids (list[str]) + + Returns: + tuple[int, int]: Returns the chunk_id, topological_ordering for + the event + """ + + # We calculate the chunk for an event using the following rules: + # + # 1. If all prev events have the same chunk ID then use that chunk ID + # 2. If we have none of the prev events but do have events pointing to + # it, then we use their chunk ID if: + # - They’re all in the same chunk, and + # - All their prev events match the events being inserted + # 3. Otherwise, create a new chunk and use that + + # Set of chunks that the event refers to. Includes None if there were + # prev events that we don't have (or don't have a chunk for) + prev_chunk_ids = set() + + for eid in prev_event_ids: + chunk_id = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={"event_id": eid}, + retcol="chunk_id", + allow_none=True, + ) + + prev_chunk_ids.add(chunk_id) + + forward_events = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "prev_event_id": event_id, + "is_state": False, + }, + retcol="event_id", + ) + + # Set of chunks that refer to this event. + forward_chunk_ids = set() + + # Set of event_ids of all prev_events of those in `forward_events`. This + # is guaranteed to contain at least the given event_id. + sibling_events = set() + for eid in set(forward_events): + chunk_id = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={"event_id": eid}, + retcol="chunk_id", + allow_none=True, + ) + + if chunk_id is not None: + # chunk_id can be None if it's an outlier + forward_chunk_ids.add(chunk_id) + + pes = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "event_id": eid, + "is_state": False, + }, + retcol="prev_event_id", + ) + + sibling_events.update(pes) + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + # If there is only one previous chunk (and that isn't None), then this + # satisfies condition one. + if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids: + chunk_id = list(prev_chunk_ids)[0] + + # This event is being inserted at the end of the chunk + new_topo = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "room_id": room_id, + "chunk_id": chunk_id, + }, + retcol="COALESCE(MAX(topological_ordering), 0)", + ) + new_topo += 1 + # If there is only one forward chunk and only one sibling event (which + # would be the given event), then this satisfies condition two. + elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1: + chunk_id = list(forward_chunk_ids)[0] + + # This event is being inserted at the start of the chunk + new_topo = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "room_id": room_id, + "chunk_id": chunk_id, + }, + retcol="COALESCE(MIN(topological_ordering), 0)", + ) + new_topo -= 1 + else: + chunk_id = self._chunk_id_gen.get_next() + new_topo = 0 + + # We've generated a new chunk, so we have to tell the + # ChunkDBOrderedListStore about that. + table.add_node(chunk_id) + + # We need to now update the database with any new edges between chunks + current_prev_ids = self._simple_select_onecol_txn( + txn, + table="chunk_graph", + keyvalues={ + "chunk_id": chunk_id, + }, + retcol="prev_id", + ) + + current_forward_ids = self._simple_select_onecol_txn( + txn, + table="chunk_graph", + keyvalues={ + "prev_id": chunk_id, + }, + retcol="chunk_id", + ) + + prev_chunk_ids = set( + pid for pid in prev_chunk_ids + if pid is not None and pid not in current_prev_ids and pid != chunk_id + ) + forward_chunk_ids = set( + fid for fid in forward_chunk_ids + if fid not in current_forward_ids and fid != chunk_id + ) + + if prev_chunk_ids: + for pid in prev_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(pid, chunk_id) + + if forward_chunk_ids: + for fid in forward_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(chunk_id, fid) + + # We now need to update the backwards extremities for the chunks. + + txn.executemany(""" + INSERT INTO chunk_backwards_extremities (chunk_id, event_id) + SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events) + """, [(chunk_id, eid, eid) for eid in prev_event_ids]) + + self._simple_delete_txn( + txn, + table="chunk_backwards_extremities", + keyvalues={"event_id": event_id}, + ) + + return chunk_id, new_topo + @defer.inlineCallbacks def have_events_in_timeline(self, event_ids): """Given a list of event ids, check if we have already processed and -- cgit 1.4.1 From 6c1d13a15a49d6a003348f627fff9abb653cd317 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:30:33 +0100 Subject: Correctly loop over events_and_contexts --- synapse/storage/events.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6db390485a..77a0dcd3b8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1115,19 +1115,19 @@ class EventsStore(EventsWorkerStore): ], ) - if event.internal_metadata.is_outlier(): - chunk_id, _topo = None, 0 - else: - chunk_id, _topo = self._compute_chunk_id_txn( - txn, event.room_id, event.event_id, - [eid for eid, _ in event.prev_events], - ) + for event, _ in events_and_contexts: + if event.internal_metadata.is_outlier(): + chunk_id, _topo = None, 0 + else: + chunk_id, _topo = self._compute_chunk_id_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], + ) - self._simple_insert_many_txn( - txn, - table="events", - values=[ - { + self._simple_insert_txn( + txn, + table="events", + values={ "stream_ordering": event.internal_metadata.stream_ordering, "chunk_id": chunk_id, "topological_ordering": event.depth, @@ -1145,10 +1145,8 @@ class EventsStore(EventsWorkerStore): "url" in event.content and isinstance(event.content["url"], basestring) ), - } - for event, _ in events_and_contexts - ], - ) + }, + ) def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were -- cgit 1.4.1 From 1810cc3f7e26ebaf76d47cfdf968ca67f00270d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:32:27 +0100 Subject: Remove unnecessary set --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 77a0dcd3b8..0c870df3f7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1414,7 +1414,7 @@ class EventsStore(EventsWorkerStore): # Set of event_ids of all prev_events of those in `forward_events`. This # is guaranteed to contain at least the given event_id. sibling_events = set() - for eid in set(forward_events): + for eid in forward_events: chunk_id = self._simple_select_one_onecol_txn( txn, table="events", -- cgit 1.4.1 From 1cdd0d3b0dad5076df073bbb6a48a55c02560e6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:33:57 +0100 Subject: Remove redundant conditions --- synapse/storage/events.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0c870df3f7..2f648111b3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1459,6 +1459,7 @@ class EventsStore(EventsWorkerStore): retcol="COALESCE(MAX(topological_ordering), 0)", ) new_topo += 1 + # If there is only one forward chunk and only one sibling event (which # would be the given event), then this satisfies condition two. elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1: @@ -1511,17 +1512,15 @@ class EventsStore(EventsWorkerStore): if fid not in current_forward_ids and fid != chunk_id ) - if prev_chunk_ids: - for pid in prev_chunk_ids: - # Note that the edge direction is reversed than what you might - # expect. See ChunkDBOrderedListStore for more details. - table.add_edge(pid, chunk_id) + for pid in prev_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(pid, chunk_id) - if forward_chunk_ids: - for fid in forward_chunk_ids: - # Note that the edge direction is reversed than what you might - # expect. See ChunkDBOrderedListStore for more details. - table.add_edge(chunk_id, fid) + for fid in forward_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(chunk_id, fid) # We now need to update the backwards extremities for the chunks. -- cgit 1.4.1 From ecd4931ab2e23af66faa2b61dc2dec555e203aac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:35:02 +0100 Subject: Just iterate once rather than create a new set --- synapse/storage/events.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2f648111b3..cad5086f5f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1503,24 +1503,17 @@ class EventsStore(EventsWorkerStore): retcol="chunk_id", ) - prev_chunk_ids = set( - pid for pid in prev_chunk_ids - if pid is not None and pid not in current_prev_ids and pid != chunk_id - ) - forward_chunk_ids = set( - fid for fid in forward_chunk_ids - if fid not in current_forward_ids and fid != chunk_id - ) - for pid in prev_chunk_ids: - # Note that the edge direction is reversed than what you might - # expect. See ChunkDBOrderedListStore for more details. - table.add_edge(pid, chunk_id) + if pid is not None and pid not in current_prev_ids and pid != chunk_id: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(pid, chunk_id) for fid in forward_chunk_ids: # Note that the edge direction is reversed than what you might # expect. See ChunkDBOrderedListStore for more details. - table.add_edge(chunk_id, fid) + if fid not in current_forward_ids and fid != chunk_id: + table.add_edge(chunk_id, fid) # We now need to update the backwards extremities for the chunks. -- cgit 1.4.1 From f687d8fae2df19f0453cb22e8c5f3e63187b1dd3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:45:41 +0100 Subject: Comments --- synapse/storage/events.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index cad5086f5f..da053f0b18 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1378,8 +1378,8 @@ class EventsStore(EventsWorkerStore): # # 1. If all prev events have the same chunk ID then use that chunk ID # 2. If we have none of the prev events but do have events pointing to - # it, then we use their chunk ID if: - # - They’re all in the same chunk, and + # the event, then we use their chunk ID if: + # - They're all in the same chunk, and # - All their prev events match the events being inserted # 3. Otherwise, create a new chunk and use that @@ -1411,8 +1411,8 @@ class EventsStore(EventsWorkerStore): # Set of chunks that refer to this event. forward_chunk_ids = set() - # Set of event_ids of all prev_events of those in `forward_events`. This - # is guaranteed to contain at least the given event_id. + # All the prev_events of events in `forward_events`. + # Note that this will include the current event_id. sibling_events = set() for eid in forward_events: chunk_id = self._simple_select_one_onecol_txn( -- cgit 1.4.1 From 9e1d3f119a7f80cf068d42ec2a55a1f40ff0c6d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:45:58 +0100 Subject: Remove unnecessary COALESCE --- synapse/storage/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index da053f0b18..cab5c18637 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1456,7 +1456,7 @@ class EventsStore(EventsWorkerStore): "room_id": room_id, "chunk_id": chunk_id, }, - retcol="COALESCE(MAX(topological_ordering), 0)", + retcol="MAX(topological_ordering)", ) new_topo += 1 @@ -1473,7 +1473,7 @@ class EventsStore(EventsWorkerStore): "room_id": room_id, "chunk_id": chunk_id, }, - retcol="COALESCE(MIN(topological_ordering), 0)", + retcol="MIN(topological_ordering)", ) new_topo -= 1 else: -- cgit 1.4.1 From 384731330d2a698b3bc0ec23961ad5669a3134e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 May 2018 11:51:03 +0100 Subject: Rename func to _insert_into_chunk_txn --- synapse/storage/events.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index cab5c18637..5194c4a48d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1020,7 +1020,7 @@ class EventsStore(EventsWorkerStore): } ) - chunk_id, _ = self._compute_chunk_id_txn( + chunk_id, _ = self._insert_into_chunk_txn( txn, event.room_id, event.event_id, [eid for eid, _ in event.prev_events], ) @@ -1119,7 +1119,7 @@ class EventsStore(EventsWorkerStore): if event.internal_metadata.is_outlier(): chunk_id, _topo = None, 0 else: - chunk_id, _topo = self._compute_chunk_id_txn( + chunk_id, _topo = self._insert_into_chunk_txn( txn, event.room_id, event.event_id, [eid for eid, _ in event.prev_events], ) @@ -1358,10 +1358,9 @@ class EventsStore(EventsWorkerStore): (event.event_id, event.redacts) ) - def _compute_chunk_id_txn(self, txn, room_id, event_id, prev_event_ids): - """Computes the chunk ID and topological ordering for an event. - - Also handles updating chunk_graph table. + def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids): + """Computes the chunk ID and topological ordering for an event and + handles updating chunk_graph table. Args: txn, -- cgit 1.4.1