diff options
author | Erik Johnston <erikj@jki.re> | 2016-09-07 09:39:58 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-07 09:39:58 +0100 |
commit | 94a83b534f0c482a4faa6f33433126615f14401a (patch) | |
tree | fd9f61ff3907fba599d387b5b70c5b6ec16ce9c6 /synapse/storage/events.py | |
parent | Merge pull request #1073 from matrix-org/erikj/presence_fiddle (diff) | |
parent | Scale the batch size so that we're not bitten by the minimum (diff) | |
download | synapse-94a83b534f0c482a4faa6f33433126615f14401a.tar.xz |
Merge pull request #1065 from matrix-org/erikj/state_storage
Move to storing state_groups_state as deltas
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 68 |
1 files changed, 67 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1a7d4c5199..ed182c8d11 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore): # insert into the state_group, state_groups_state and # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, ((event, context),)) + try: + self._store_mult_state_groups_txn(txn, ((event, context),)) + except Exception: + logger.exception("") + raise metadata_json = encode_json( event.internal_metadata.get_dict() @@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + for event_id, state_key in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -1582,7 +1589,66 @@ class EventsStore(SQLBaseStore): " GROUP BY state_group HAVING MAX(topological_ordering) < ?", (room_id, topological_ordering, topological_ordering) ) + state_rows = txn.fetchall() + state_groups_to_delete = [sg for sg, in state_rows] + + # Now we get all the state groups that rely on these state groups + new_state_edges = [] + chunks = [ + state_groups_to_delete[i:i + 100] + for i in xrange(0, len(state_groups_to_delete), 100) + ] + for chunk in chunks: + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=chunk, + retcols=["state_group"], + keyvalues={}, + ) + new_state_edges.extend(row["state_group"] for row in rows) + + # Now we turn the state groups that reference to-be-deleted state groups + # to non delta versions. + for new_state_edge in new_state_edges: + curr_state = self._get_state_groups_from_groups_txn( + txn, [new_state_edge], types=None + ) + curr_state = curr_state[new_state_edge] + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": new_state_edge, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in curr_state.items() + ], + ) + txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows |