diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 53feaa1960..f0aa2193fb 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -235,80 +235,21 @@ class EventFederationStore(SQLBaseStore):
],
)
- self._update_extremeties(txn, events)
+ self._update_backward_extremeties(txn, events)
- def _update_extremeties(self, txn, events):
- """Updates the event_*_extremities tables based on the new/updated
+ def _update_backward_extremeties(self, txn, events):
+ """Updates the event_backward_extremities tables based on the new/updated
events being persisted.
This is called for new events *and* for events that were outliers, but
- are are now being persisted as non-outliers.
+ are now being persisted as non-outliers.
+
+ Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)
- for room_id, room_events in events_by_room.items():
- prevs = [
- e_id for ev in room_events for e_id, _ in ev.prev_events
- if not ev.internal_metadata.is_outlier()
- ]
- if prevs:
- txn.execute(
- "DELETE FROM event_forward_extremities"
- " WHERE room_id = ?"
- " AND event_id in (%s)" % (
- ",".join(["?"] * len(prevs)),
- ),
- [room_id] + prevs,
- )
-
- query = (
- "INSERT INTO event_forward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
- " )"
- )
-
- txn.executemany(
- query,
- [
- (ev.event_id, ev.room_id, ev.event_id) for ev in events
- if not ev.internal_metadata.is_outlier()
- ]
- )
-
- # We now insert into stream_ordering_to_exterm a mapping from room_id,
- # new stream_ordering to new forward extremeties in the room.
- # This allows us to later efficiently look up the forward extremeties
- # for a room before a given stream_ordering
- max_stream_ord = max(
- ev.internal_metadata.stream_ordering for ev in events
- )
- new_extrem = {}
- for room_id in events_by_room:
- event_ids = self._simple_select_onecol_txn(
- txn,
- table="event_forward_extremities",
- keyvalues={"room_id": room_id},
- retcol="event_id",
- )
- new_extrem[room_id] = event_ids
-
- self._simple_insert_many_txn(
- txn,
- table="stream_ordering_to_exterm",
- values=[
- {
- "room_id": room_id,
- "event_id": event_id,
- "stream_ordering": max_stream_ord,
- }
- for room_id, extrem_evs in new_extrem.items()
- for event_id in extrem_evs
- ]
- )
-
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
@@ -339,11 +280,6 @@ class EventFederationStore(SQLBaseStore):
]
)
- for room_id in events_by_room:
- txn.call_after(
- self.get_latest_event_ids_in_room.invalidate, (room_id,)
- )
-
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0d6519f30d..295f2522b5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -279,6 +279,7 @@ class EventsStore(SQLBaseStore):
# We can't easily parallelize these since different chunks
# might contain the same event. :(
+ new_forward_extremeties = {}
current_state_for_room = {}
if not backfilled:
# Work out the new "current state" for each room.
@@ -296,20 +297,16 @@ class EventsStore(SQLBaseStore):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
- new_latest_event_ids = set(latest_event_ids)
- for event, ctx in ev_ctx_rm:
- if event.internal_metadata.is_outlier():
- continue
-
- new_latest_event_ids.difference_update(
- e_id for e_id, _ in event.prev_events
- )
- new_latest_event_ids.add(event.event_id)
+ new_latest_event_ids = yield self._calculate_new_extremeties(
+ room_id, [ev for ev, _ in ev_ctx_rm]
+ )
if new_latest_event_ids == set(latest_event_ids):
# No change in extremities, so no change in state
continue
+ new_forward_extremeties[room_id] = new_latest_event_ids
+
# Now we need to work out the different state sets for
# each state extremities
state_sets = []
@@ -358,10 +355,46 @@ class EventsStore(SQLBaseStore):
backfilled=backfilled,
delete_existing=delete_existing,
current_state_for_room=current_state_for_room,
+ new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
+ def _calculate_new_extremeties(self, room_id, events):
+ latest_event_ids = yield self.get_latest_event_ids_in_room(
+ room_id
+ )
+ new_latest_event_ids = set(latest_event_ids)
+ new_latest_event_ids.update(
+ event.event_id for event in events
+ if not event.internal_metadata.is_outlier()
+ )
+ new_latest_event_ids.difference_update(
+ e_id
+ for event in events
+ for e_id, _ in event.prev_events
+ if not event.internal_metadata.is_outlier()
+ )
+
+ rows = yield self._simple_select_many_batch(
+ table="event_edges",
+ column="prev_event_id",
+ iterable=list(new_latest_event_ids),
+ retcols=["prev_event_id"],
+ keyvalues={
+ "room_id": room_id,
+ "is_state": False,
+ },
+ desc="_calculate_new_extremeties",
+ )
+
+ new_latest_event_ids.difference_update(
+ row["prev_event_id"] for row in rows
+ )
+
+ defer.returnValue(new_latest_event_ids)
+
+ @defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
allow_none=False):
@@ -418,52 +451,9 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
- delete_existing=False):
- # We purposefully do this first since if we include a `current_state`
- # key, we *want* to update the `current_state_events` table
- if current_state:
- txn.call_after(self._get_current_state_for_key.invalidate_all)
- txn.call_after(self.get_rooms_for_user.invalidate_all)
- txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
-
- # Add an entry to the current_state_resets table to record the point
- # where we clobbered the current state
- stream_order = event.internal_metadata.stream_ordering
- self._simple_insert_txn(
- txn,
- table="current_state_resets",
- values={"event_stream_ordering": stream_order}
- )
-
- self._simple_delete_txn(
- txn,
- table="current_state_events",
- keyvalues={"room_id": event.room_id},
- )
-
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- }
- )
-
- return self._persist_events_txn(
- txn,
- [(event, context)],
- backfilled=backfilled,
- delete_existing=delete_existing,
- )
-
- @log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- delete_existing=False, current_state_for_room={}):
+ delete_existing=False, current_state_for_room={},
+ new_forward_extremeties={}):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
@@ -473,6 +463,7 @@ class EventsStore(SQLBaseStore):
If delete_existing is True then existing events will be purged from the
database before insertion. This is useful when retrying due to IntegrityError.
"""
+ max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
for room_id, current_state in current_state_for_room.iteritems():
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
@@ -480,11 +471,10 @@ class EventsStore(SQLBaseStore):
# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
- stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
self._simple_insert_txn(
txn,
table="current_state_resets",
- values={"event_stream_ordering": stream_order}
+ values={"event_stream_ordering": max_stream_order}
)
self._simple_delete_txn(
@@ -507,6 +497,46 @@ class EventsStore(SQLBaseStore):
],
)
+ for room_id, new_extrem in new_forward_extremeties.items():
+ self._simple_delete_txn(
+ txn,
+ table="event_forward_extremities",
+ keyvalues={"room_id": room_id},
+ )
+ txn.call_after(
+ self.get_latest_event_ids_in_room.invalidate, (room_id,)
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="event_forward_extremities",
+ values=[
+ {
+ "event_id": ev_id,
+ "room_id": room_id,
+ }
+ for room_id, new_extrem in new_forward_extremeties.items()
+ for ev_id in new_extrem
+ ],
+ )
+ # We now insert into stream_ordering_to_exterm a mapping from room_id,
+ # new stream_ordering to new forward extremeties in the room.
+ # This allows us to later efficiently look up the forward extremeties
+ # for a room before a given stream_ordering
+ self._simple_insert_many_txn(
+ txn,
+ table="stream_ordering_to_exterm",
+ values=[
+ {
+ "room_id": room_id,
+ "event_id": event_id,
+ "stream_ordering": max_stream_order,
+ }
+ for room_id, new_extrem in new_forward_extremeties.items()
+ for event_id in new_extrem
+ ]
+ )
+
# Ensure that we don't have the same event twice.
# Pick the earliest non-outlier if there is one, else the earliest one.
new_events_and_contexts = OrderedDict()
|