summary refs log tree commit diff
path: root/synapse/storage/background_updates.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2016-07-22 13:14:03 +0100
committerRichard van der Hoff <richard@matrix.org>2016-07-22 13:16:39 +0100
commitec5717caf59eb72caf6f82f1643f492f328a4be5 (patch)
tree57cd7e8a58bcd51902d854e1da69f2458ae1a3bb /synapse/storage/background_updates.py
parentMerge pull request #944 from matrix-org/rav/devices_returns_list (diff)
downloadsynapse-ec5717caf59eb72caf6f82f1643f492f328a4be5.tar.xz
Create index on user_ips in the background
user_ips is kinda big, so really we want to add the index in the background
once we're running. Replace the schema delta with one which will do that.

I've done this in a way that's reasonably easy to reuse as there a few other
indexes I need, and I don't suppose they will be the last.
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r--synapse/storage/background_updates.py73
1 files changed, 66 insertions, 7 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 66a995157d..75951d0173 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
+from . import engines
 
 from twisted.internet import defer
 
@@ -106,13 +107,13 @@ class BackgroundUpdateStore(SQLBaseStore):
                 )
             except:
                 logger.exception("Error doing update")
-
-            if result is None:
-                logger.info(
-                    "No more background updates to do."
-                    " Unscheduling background update task."
-                )
-                return
+            else:
+                if result is None:
+                    logger.info(
+                        "No more background updates to do."
+                        " Unscheduling background update task."
+                    )
+                    return
 
     @defer.inlineCallbacks
     def do_background_update(self, desired_duration_ms):
@@ -202,6 +203,64 @@ class BackgroundUpdateStore(SQLBaseStore):
         """
         self._background_update_handlers[update_name] = update_handler
 
+    def register_background_index_update(self, update_name, index_name,
+                                         table, columns):
+        """Helper for store classes to do a background index addition
+
+        To use:
+
+        1. use a schema delta file to add a background update. Example:
+            INSERT INTO background_updates (update_name, progress_json) VALUES
+                ('my_new_index', '{}');
+
+        2. In the Store constructor, call this method
+
+        Args:
+            update_name (str): update_name to register for
+            index_name (str): name of index to add
+            table (str): table to add index to
+            columns (list[str]): columns/expressions to include in index
+        """
+
+        # if this is postgres, we add the indexes concurrently. Otherwise
+        # we fall back to doing it inline
+        if isinstance(self.database_engine, engines.PostgresEngine):
+            conc = True
+        else:
+            conc = False
+
+        sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \
+              % {
+                  "conc": "CONCURRENTLY" if conc else "",
+                  "name": index_name,
+                  "table": table,
+                  "columns": ", ".join(columns),
+              }
+
+        def create_index_concurrently(conn):
+            conn.rollback()
+            # postgres insists on autocommit for the index
+            conn.set_session(autocommit=True)
+            c = conn.cursor()
+            c.execute(sql)
+            conn.set_session(autocommit=False)
+
+        def create_index(conn):
+            c = conn.cursor()
+            c.execute(sql)
+
+        @defer.inlineCallbacks
+        def updater(progress, batch_size):
+            logger.info("Adding index %s to %s", index_name, table)
+            if conc:
+                yield self.runWithConnection(create_index_concurrently)
+            else:
+                yield self.runWithConnection(create_index)
+            yield self._end_background_update(update_name)
+            defer.returnValue(1)
+
+        self.register_background_update_handler(update_name, updater)
+
     def start_background_update(self, update_name, progress):
         """Starts a background update running.