diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e7487311ce..046174edb3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,6 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
@@ -386,12 +387,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
)
for room_id, ev_ctx_rm in iteritems(events_by_room):
- # Work out new extremities by recursively adding and removing
- # the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
- new_latest_event_ids = yield self._calculate_new_extremeties(
+ new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
@@ -400,6 +399,12 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# No change in extremities, so no change in state
continue
+ # there should always be at least one forward extremity.
+ # (except during the initial persistence of the send_join
+ # results, in which case there will be no existing
+ # extremities, so we'll `continue` above and skip this bit.)
+ assert new_latest_event_ids, "No forward extremities left!"
+
new_forward_extremeties[room_id] = new_latest_event_ids
len_1 = (
@@ -517,44 +522,88 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
)
@defer.inlineCallbacks
- def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
- """Calculates the new forward extremeties for a room given events to
+ def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
+ """Calculates the new forward extremities for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
- new_latest_event_ids = set(latest_event_ids)
- # First, add all the new events to the list
- new_latest_event_ids.update(
- event.event_id for event, ctx in event_contexts
+
+ # we're only interested in new events which aren't outliers and which aren't
+ # being rejected.
+ new_events = [
+ event for event, ctx in event_contexts
if not event.internal_metadata.is_outlier() and not ctx.rejected
+ ]
+
+ # start with the existing forward extremities
+ result = set(latest_event_ids)
+
+ # add all the new events to the list
+ result.update(
+ event.event_id for event in new_events
)
- # Now remove all events that are referenced by the to-be-added events
- new_latest_event_ids.difference_update(
+
+ # Now remove all events which are prev_events of any of the new events
+ result.difference_update(
e_id
- for event, ctx in event_contexts
+ for event in new_events
for e_id, _ in event.prev_events
- if not event.internal_metadata.is_outlier() and not ctx.rejected
)
- # And finally remove any events that are referenced by previously added
- # events.
- rows = yield self._simple_select_many_batch(
- table="event_edges",
- column="prev_event_id",
- iterable=list(new_latest_event_ids),
- retcols=["prev_event_id"],
- keyvalues={
- "is_state": False,
- },
- desc="_calculate_new_extremeties",
- )
+ # Finally, remove any events which are prev_events of any existing events.
+ existing_prevs = yield self._get_events_which_are_prevs(result)
+ result.difference_update(existing_prevs)
- new_latest_event_ids.difference_update(
- row["prev_event_id"] for row in rows
- )
+ if not result:
+ logger.warn(
+ "Forward extremity list A+B-C-D is now empty in %s. "
+ "Old extremities (A): %s, new events (B): %s, "
+ "existing events which are reffed by new events (C): %s, "
+ "new events which are reffed by existing events (D): %s",
+ room_id, latest_event_ids, new_events,
+ [e_id for event in new_events for e_id, _ in event.prev_events],
+ existing_prevs,
+ )
+ defer.returnValue(result)
- defer.returnValue(new_latest_event_ids)
+ @defer.inlineCallbacks
+ def _get_events_which_are_prevs(self, event_ids):
+ """Filter the supplied list of event_ids to get those which are prev_events of
+ existing (non-outlier) events.
+
+ Args:
+ event_ids (Iterable[str]): event ids to filter
+
+ Returns:
+ Deferred[List[str]]: filtered event ids
+ """
+ results = []
+
+ def _get_events(txn, batch):
+ sql = """
+ SELECT prev_event_id
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ WHERE
+ prev_event_id IN (%s)
+ AND rejections.event_id IS NULL
+ """ % (
+ ",".join("?" for _ in batch),
+ )
+
+ txn.execute(sql, batch)
+ results.extend(r[0] for r in txn)
+
+ for chunk in batch_iter(event_ids, 100):
+ yield self.runInteraction(
+ "_get_events_which_are_prevs",
+ _get_events,
+ chunk,
+ )
+
+ defer.returnValue(results)
@defer.inlineCallbacks
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
@@ -586,10 +635,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
the new current state is only returned if we've already calculated
it.
"""
-
- if not new_latest_event_ids:
- return
-
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
|