diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/events.py | 142 |
1 files changed, 86 insertions, 56 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0d6519f30d..295f2522b5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -279,6 +279,7 @@ class EventsStore(SQLBaseStore): # We can't easily parallelize these since different chunks # might contain the same event. :( + new_forward_extremeties = {} current_state_for_room = {} if not backfilled: # Work out the new "current state" for each room. @@ -296,20 +297,16 @@ class EventsStore(SQLBaseStore): latest_event_ids = yield self.get_latest_event_ids_in_room( room_id ) - new_latest_event_ids = set(latest_event_ids) - for event, ctx in ev_ctx_rm: - if event.internal_metadata.is_outlier(): - continue - - new_latest_event_ids.difference_update( - e_id for e_id, _ in event.prev_events - ) - new_latest_event_ids.add(event.event_id) + new_latest_event_ids = yield self._calculate_new_extremeties( + room_id, [ev for ev, _ in ev_ctx_rm] + ) if new_latest_event_ids == set(latest_event_ids): # No change in extremities, so no change in state continue + new_forward_extremeties[room_id] = new_latest_event_ids + # Now we need to work out the different state sets for # each state extremities state_sets = [] @@ -358,10 +355,46 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, delete_existing=delete_existing, current_state_for_room=current_state_for_room, + new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks + def _calculate_new_extremeties(self, room_id, events): + latest_event_ids = yield self.get_latest_event_ids_in_room( + room_id + ) + new_latest_event_ids = set(latest_event_ids) + new_latest_event_ids.update( + event.event_id for event in events + if not event.internal_metadata.is_outlier() + ) + new_latest_event_ids.difference_update( + e_id + for event in events + for e_id, _ in event.prev_events + if not event.internal_metadata.is_outlier() + ) + + rows = yield self._simple_select_many_batch( + table="event_edges", + column="prev_event_id", + iterable=list(new_latest_event_ids), + retcols=["prev_event_id"], + keyvalues={ + "room_id": room_id, + "is_state": False, + }, + desc="_calculate_new_extremeties", + ) + + new_latest_event_ids.difference_update( + row["prev_event_id"] for row in rows + ) + + defer.returnValue(new_latest_event_ids) + + @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, allow_none=False): @@ -418,52 +451,9 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state, backfilled=False, - delete_existing=False): - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.call_after(self._get_current_state_for_key.invalidate_all) - txn.call_after(self.get_rooms_for_user.invalidate_all) - txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - - # Add an entry to the current_state_resets table to record the point - # where we clobbered the current state - stream_order = event.internal_metadata.stream_ordering - self._simple_insert_txn( - txn, - table="current_state_resets", - values={"event_stream_ordering": stream_order} - ) - - self._simple_delete_txn( - txn, - table="current_state_events", - keyvalues={"room_id": event.room_id}, - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - } - ) - - return self._persist_events_txn( - txn, - [(event, context)], - backfilled=backfilled, - delete_existing=delete_existing, - ) - - @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, current_state_for_room={}): + delete_existing=False, current_state_for_room={}, + new_forward_extremeties={}): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, @@ -473,6 +463,7 @@ class EventsStore(SQLBaseStore): If delete_existing is True then existing events will be purged from the database before insertion. This is useful when retrying due to IntegrityError. """ + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering for room_id, current_state in current_state_for_room.iteritems(): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) @@ -480,11 +471,10 @@ class EventsStore(SQLBaseStore): # Add an entry to the current_state_resets table to record the point # where we clobbered the current state - stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering self._simple_insert_txn( txn, table="current_state_resets", - values={"event_stream_ordering": stream_order} + values={"event_stream_ordering": max_stream_order} ) self._simple_delete_txn( @@ -507,6 +497,46 @@ class EventsStore(SQLBaseStore): ], ) + for room_id, new_extrem in new_forward_extremeties.items(): + self._simple_delete_txn( + txn, + table="event_forward_extremities", + keyvalues={"room_id": room_id}, + ) + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, (room_id,) + ) + + self._simple_insert_many_txn( + txn, + table="event_forward_extremities", + values=[ + { + "event_id": ev_id, + "room_id": room_id, + } + for room_id, new_extrem in new_forward_extremeties.items() + for ev_id in new_extrem + ], + ) + # We now insert into stream_ordering_to_exterm a mapping from room_id, + # new stream_ordering to new forward extremeties in the room. + # This allows us to later efficiently look up the forward extremeties + # for a room before a given stream_ordering + self._simple_insert_many_txn( + txn, + table="stream_ordering_to_exterm", + values=[ + { + "room_id": room_id, + "event_id": event_id, + "stream_ordering": max_stream_order, + } + for room_id, new_extrem in new_forward_extremeties.items() + for event_id in new_extrem + ] + ) + # Ensure that we don't have the same event twice. # Pick the earliest non-outlier if there is one, else the earliest one. new_events_and_contexts = OrderedDict() |