diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 33fccfa7a8..86a7c5920d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -342,8 +342,20 @@ class EventsStore(SQLBaseStore):
# NB: Assumes that we are only persisting events for one room
# at a time.
+
+ # map room_id->list[event_ids] giving the new forward
+ # extremities in each room
new_forward_extremeties = {}
+
+ # map room_id->(type,state_key)->event_id tracking the full
+ # state in each room after adding these events
current_state_for_room = {}
+
+ # map room_id->(to_delete, to_insert) where each entry is
+ # a map (type,key)->event_id giving the state delta in each
+ # room
+ state_delta_for_room = {}
+
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
@@ -386,11 +398,19 @@ class EventsStore(SQLBaseStore):
if all_single_prev_not_state:
continue
- state = yield self._calculate_state_delta(
- room_id, ev_ctx_rm, new_latest_event_ids
+ logger.info(
+ "Calculating state delta for room %s", room_id,
+ )
+ current_state = yield self._get_new_state_after_events(
+ ev_ctx_rm, new_latest_event_ids,
)
- if state:
- current_state_for_room[room_id] = state
+ if current_state is not None:
+ current_state_for_room[room_id] = current_state
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ if delta is not None:
+ state_delta_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -398,7 +418,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
delete_existing=delete_existing,
- current_state_for_room=current_state_for_room,
+ state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
@@ -415,7 +435,7 @@ class EventsStore(SQLBaseStore):
event_counter.inc(event.type, origin_type, origin_entity)
- for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+ for room_id, new_state in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
@@ -467,20 +487,22 @@ class EventsStore(SQLBaseStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
- """Calculate the new state deltas for a room.
+ def _get_new_state_after_events(self, events_context, new_latest_event_ids):
+ """Calculate the current state dict after adding some new events to
+ a room
- Assumes that we are only persisting events for one room at a time.
+ Args:
+ events_context (list[(EventBase, EventContext)]):
+ events and contexts which are being added to the room
+
+ new_latest_event_ids (iterable[str]):
+ the new forward extremities for the room.
Returns:
- 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert. `new_state` is the full set of state.
- May return None if there are no changes to be applied.
+ Deferred[dict[(str,str), str]|None]:
+ None if there are no changes to the room state, or
+ a dict of (type, state_key) -> event_id].
"""
- # Now we need to work out the different state sets for
- # each state extremities
state_sets = []
state_groups = set()
missing_event_ids = []
@@ -523,12 +545,12 @@ class EventsStore(SQLBaseStore):
state_sets.extend(group_to_state.itervalues())
if not new_latest_event_ids:
- current_state = {}
+ defer.returnValue({})
elif was_updated:
if len(state_sets) == 1:
# If there is only one state set, then we know what the current
# state is.
- current_state = state_sets[0]
+ defer.returnValue(state_sets[0])
else:
# We work out the current state by passing the state sets to the
# state resolution algorithm. It may ask for some events, including
@@ -537,8 +559,7 @@ class EventsStore(SQLBaseStore):
# up in the db.
logger.info(
- "Resolving state for %s with %i state sets",
- room_id, len(state_sets),
+ "Resolving state with %i state sets", len(state_sets),
)
events_map = {ev.event_id: ev for ev, _ in events_context}
@@ -567,9 +588,22 @@ class EventsStore(SQLBaseStore):
state_sets,
state_map_factory=get_events,
)
+ defer.returnValue(current_state)
else:
return
+ @defer.inlineCallbacks
+ def _calculate_state_delta(self, room_id, current_state):
+ """Calculate the new state deltas for a room.
+
+ Assumes that we are only persisting events for one room at a time.
+
+ Returns:
+ 2-tuple (to_delete, to_insert) where both are state dicts,
+ i.e. (type, state_key) -> event_id. `to_delete` are the entries to
+ first be deleted from current_state_events, `to_insert` are entries
+ to insert.
+ """
existing_state = yield self.get_current_state_ids(room_id)
existing_events = set(existing_state.itervalues())
@@ -589,7 +623,7 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert
}
- defer.returnValue((to_delete, to_insert, current_state))
+ defer.returnValue((to_delete, to_insert))
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
@@ -649,7 +683,7 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- delete_existing=False, current_state_for_room={},
+ delete_existing=False, state_delta_for_room={},
new_forward_extremeties={}):
"""Insert some number of room events into the necessary database tables.
@@ -665,7 +699,7 @@ class EventsStore(SQLBaseStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- current_state_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list[str], list[str])]):
The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of event ids to be removed
from the current state, and a list of event ids to be added to
@@ -677,7 +711,7 @@ class EventsStore(SQLBaseStore):
"""
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+ self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
self._update_forward_extremities_txn(
txn,
@@ -721,9 +755,8 @@ class EventsStore(SQLBaseStore):
events_and_contexts=events_and_contexts,
)
- # Insert into the state_groups, state_groups_state, and
- # event_to_state_groups tables.
- self._store_mult_state_groups_txn(txn, events_and_contexts)
+ # Insert into event_to_state_groups.
+ self._store_event_state_mappings_txn(txn, events_and_contexts)
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
@@ -743,7 +776,7 @@ class EventsStore(SQLBaseStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
- to_delete, to_insert, _ = current_state_tuple
+ to_delete, to_insert = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
@@ -958,10 +991,9 @@ class EventsStore(SQLBaseStore):
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.
- # insert into the state_group, state_groups_state and
- # event_to_state_groups tables.
+ # insert into event_to_state_groups.
try:
- self._store_mult_state_groups_txn(txn, ((event, context),))
+ self._store_event_state_mappings_txn(txn, ((event, context),))
except Exception:
logger.exception("")
raise
@@ -2031,16 +2063,32 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
- def delete_old_state(self, room_id, topological_ordering):
- return self.runInteraction(
- "delete_old_state",
- self._delete_old_state_txn, room_id, topological_ordering
- )
+ def purge_history(
+ self, room_id, topological_ordering, delete_local_events,
+ ):
+ """Deletes room history before a certain point
+
+ Args:
+ room_id (str):
+
+ topological_ordering (int):
+ minimum topo ordering to preserve
- def _delete_old_state_txn(self, txn, room_id, topological_ordering):
- """Deletes old room state
+ delete_local_events (bool):
+ if True, we will delete local events as well as remote ones
+ (instead of just marking them as outliers and deleting their
+ state groups).
"""
+ return self.runInteraction(
+ "purge_history",
+ self._purge_history_txn, room_id, topological_ordering,
+ delete_local_events,
+ )
+
+ def _purge_history_txn(
+ self, txn, room_id, topological_ordering, delete_local_events,
+ ):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@@ -2081,7 +2129,7 @@ class EventsStore(SQLBaseStore):
400, "topological_ordering is greater than forward extremeties"
)
- logger.debug("[purge] looking for events to delete")
+ logger.info("[purge] looking for events to delete")
txn.execute(
"SELECT event_id, state_key FROM events"
@@ -2093,16 +2141,16 @@ class EventsStore(SQLBaseStore):
to_delete = [
(event_id,) for event_id, state_key in event_rows
- if state_key is None and not self.hs.is_mine_id(event_id)
+ if state_key is None and (
+ delete_local_events or not self.hs.is_mine_id(event_id)
+ )
]
logger.info(
- "[purge] found %i events before cutoff, of which %i are remote"
- " non-state events to delete", len(event_rows), len(to_delete))
-
- for event_id, state_key in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+ "[purge] found %i events before cutoff, of which %i can be deleted",
+ len(event_rows), len(to_delete),
+ )
- logger.debug("[purge] Finding new backward extremities")
+ logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
@@ -2116,7 +2164,7 @@ class EventsStore(SQLBaseStore):
)
new_backwards_extrems = txn.fetchall()
- logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+ logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
@@ -2132,7 +2180,7 @@ class EventsStore(SQLBaseStore):
]
)
- logger.debug("[purge] finding redundant state groups")
+ logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
# to be deleted.
@@ -2149,15 +2197,15 @@ class EventsStore(SQLBaseStore):
)
state_rows = txn.fetchall()
- logger.debug("[purge] found %i redundant state groups", len(state_rows))
+ logger.info("[purge] found %i redundant state groups", len(state_rows))
# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])
# Now we get all the state groups that rely on these state groups
- logger.debug("[purge] finding state groups which depend on redundant"
- " state groups")
+ logger.info("[purge] finding state groups which depend on redundant"
+ " state groups")
remaining_state_groups = []
for i in xrange(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
@@ -2182,7 +2230,7 @@ class EventsStore(SQLBaseStore):
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
- logger.debug("[purge] de-delta-ing remaining state group %s", sg)
+ logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
txn, [sg], types=None
)
@@ -2219,7 +2267,7 @@ class EventsStore(SQLBaseStore):
],
)
- logger.debug("[purge] removing redundant state groups")
+ logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
@@ -2229,18 +2277,15 @@ class EventsStore(SQLBaseStore):
state_rows
)
- # Delete all non-state
- logger.debug("[purge] removing events from event_to_state_groups")
+ logger.info("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)
-
- logger.debug("[purge] updating room_depth")
- txn.execute(
- "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (topological_ordering, room_id,)
- )
+ for event_id, _ in event_rows:
+ txn.call_after(self._get_state_group_for_event.invalidate, (
+ event_id,
+ ))
# Delete all remote non-state events
for table in (
@@ -2258,7 +2303,8 @@ class EventsStore(SQLBaseStore):
"event_signatures",
"rejections",
):
- logger.debug("[purge] removing remote non-state events from %s", table)
+ logger.info("[purge] removing remote non-state events from %s",
+ table)
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -2266,16 +2312,30 @@ class EventsStore(SQLBaseStore):
)
# Mark all state and own events as outliers
- logger.debug("[purge] marking remaining events as outliers")
+ logger.info("[purge] marking remaining events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
[
(True, event_id,) for event_id, state_key in event_rows
- if state_key is not None or self.hs.is_mine_id(event_id)
+ if state_key is not None or (
+ not delete_local_events and self.hs.is_mine_id(event_id)
+ )
]
)
+ # synapse tries to take out an exclusive lock on room_depth whenever it
+ # persists events (because upsert), and once we run this update, we
+ # will block that for the rest of our transaction.
+ #
+ # So, let's stick it at the end so that we don't block event
+ # persistence.
+ logger.info("[purge] updating room_depth")
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (topological_ordering, room_id,)
+ )
+
logger.info("[purge] done")
@defer.inlineCallbacks
|