diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index ec32008d5a..11605b34a3 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -27,12 +27,17 @@ class EventContext(object):
]
def __init__(self):
+ # The current state including the current event
self.current_state_ids = None
+ # The current state excluding the current event
self.prev_state_ids = None
self.state_group = None
+
self.rejected = False
self.push_actions = []
+ # A previously persisted state group and a delta between that
+ # and this state.
self.prev_group = None
self.delta_ids = None
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 968b68f462..ee8b763008 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -118,8 +118,6 @@ class StateStore(SQLBaseStore):
if self._have_persisted_state_group_txn(txn, context.state_group):
continue
- state_event_ids = dict(context.current_state_ids)
-
self._simple_insert_txn(
txn,
table="state_groups",
@@ -130,49 +128,36 @@ class StateStore(SQLBaseStore):
},
)
+ # We persist as a delta if we can, while also ensuring the chain
+ # of deltas isn't tooo long, as otherwise read performance degrades.
if context.prev_group:
potential_hops = self._count_state_group_hops_txn(
txn, context.prev_group
)
- if potential_hops < MAX_STATE_DELTA_HOPS:
- self._simple_insert_txn(
- txn,
- table="state_group_edges",
- values={
- "state_group": context.state_group,
- "prev_state_group": context.prev_group,
- },
- )
+ if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
+ self._simple_insert_txn(
+ txn,
+ table="state_group_edges",
+ values={
+ "state_group": context.state_group,
+ "prev_state_group": context.prev_group,
+ },
+ )
- self._simple_insert_many_txn(
- txn,
- table="state_groups_state",
- values=[
- {
- "state_group": context.state_group,
- "room_id": event.room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": state_id,
- }
- for key, state_id in context.delta_ids.items()
- ],
- )
- else:
- self._simple_insert_many_txn(
- txn,
- table="state_groups_state",
- values=[
- {
- "state_group": context.state_group,
- "room_id": event.room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": state_id,
- }
- for key, state_id in context.current_state_ids.items()
- ],
- )
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": context.state_group,
+ "room_id": event.room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in context.delta_ids.items()
+ ],
+ )
else:
self._simple_insert_many_txn(
txn,
@@ -185,7 +170,7 @@ class StateStore(SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in state_event_ids.items()
+ for key, state_id in context.current_state_ids.items()
],
)
@@ -202,6 +187,10 @@ class StateStore(SQLBaseStore):
)
def _count_state_group_hops_txn(self, txn, state_group):
+ """Given a state group, count how many hops there are in the tree.
+
+ This is used to ensure the delta chains don't get too long.
+ """
if isinstance(self.database_engine, PostgresEngine):
sql = ("""
WITH RECURSIVE state(state_group) AS (
@@ -319,6 +308,11 @@ class StateStore(SQLBaseStore):
results = {group: {} for group in groups}
if isinstance(self.database_engine, PostgresEngine):
+ # The below query walks the state_group tree so that the "state"
+ # table includes all state_groups in the tree. It then joins
+ # against `state_groups_state` to fetch the latest state.
+ # It assumes that previous state groups are always numerically
+ # lesser.
sql = ("""
WITH RECURSIVE state(state_group) AS (
VALUES(?::bigint)
@@ -644,6 +638,9 @@ class StateStore(SQLBaseStore):
@defer.inlineCallbacks
def _background_deduplicate_state(self, progress, batch_size):
+ """This background update will slowly deduplicate state by reencoding
+ them as deltas.
+ """
last_state_group = progress.get("last_state_group", 0)
rows_inserted = progress.get("rows_inserted", 0)
max_group = progress.get("max_group", None)
|