summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-07 09:39:58 +0100
committerGitHub <noreply@github.com>2016-09-07 09:39:58 +0100
commit94a83b534f0c482a4faa6f33433126615f14401a (patch)
treefd9f61ff3907fba599d387b5b70c5b6ec16ce9c6 /synapse/storage/events.py
parentMerge pull request #1073 from matrix-org/erikj/presence_fiddle (diff)
parentScale the batch size so that we're not bitten by the minimum (diff)
downloadsynapse-94a83b534f0c482a4faa6f33433126615f14401a.tar.xz
Merge pull request #1065 from matrix-org/erikj/state_storage
Move to storing state_groups_state as deltas
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py68
1 files changed, 67 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1a7d4c5199..ed182c8d11 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore):
 
                 # insert into the state_group, state_groups_state and
                 # event_to_state_groups tables.
-                self._store_mult_state_groups_txn(txn, ((event, context),))
+                try:
+                    self._store_mult_state_groups_txn(txn, ((event, context),))
+                except Exception:
+                    logger.exception("")
+                    raise
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
@@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore):
         )
         event_rows = txn.fetchall()
 
+        for event_id, state_key in event_rows:
+            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
@@ -1582,7 +1589,66 @@ class EventsStore(SQLBaseStore):
             " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
             (room_id, topological_ordering, topological_ordering)
         )
+
         state_rows = txn.fetchall()
+        state_groups_to_delete = [sg for sg, in state_rows]
+
+        # Now we get all the state groups that rely on these state groups
+        new_state_edges = []
+        chunks = [
+            state_groups_to_delete[i:i + 100]
+            for i in xrange(0, len(state_groups_to_delete), 100)
+        ]
+        for chunk in chunks:
+            rows = self._simple_select_many_txn(
+                txn,
+                table="state_group_edges",
+                column="prev_state_group",
+                iterable=chunk,
+                retcols=["state_group"],
+                keyvalues={},
+            )
+            new_state_edges.extend(row["state_group"] for row in rows)
+
+        # Now we turn the state groups that reference to-be-deleted state groups
+        # to non delta versions.
+        for new_state_edge in new_state_edges:
+            curr_state = self._get_state_groups_from_groups_txn(
+                txn, [new_state_edge], types=None
+            )
+            curr_state = curr_state[new_state_edge]
+
+            self._simple_delete_txn(
+                txn,
+                table="state_groups_state",
+                keyvalues={
+                    "state_group": new_state_edge,
+                }
+            )
+
+            self._simple_delete_txn(
+                txn,
+                table="state_group_edges",
+                keyvalues={
+                    "state_group": new_state_edge,
+                }
+            )
+
+            self._simple_insert_many_txn(
+                txn,
+                table="state_groups_state",
+                values=[
+                    {
+                        "state_group": new_state_edge,
+                        "room_id": room_id,
+                        "type": key[0],
+                        "state_key": key[1],
+                        "event_id": state_id,
+                    }
+                    for key, state_id in curr_state.items()
+                ],
+            )
+
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             state_rows