summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/background_updates.py98
-rw-r--r--synapse/storage/client_ips.py15
-rw-r--r--synapse/storage/schema/delta/33/user_ips_index.sql3
-rw-r--r--tests/storage/test_background_update.py20
-rw-r--r--tests/unittest.py11
5 files changed, 120 insertions, 27 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 66a995157d..30d0e4c5dc 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
+from . import engines
 
 from twisted.internet import defer
 
@@ -87,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        while True:
-            if self._background_update_timer is not None:
-                return
+        assert self._background_update_timer is None, \
+            "background updates already running"
+
+        logger.info("Starting background schema updates")
 
+        while True:
             sleep = defer.Deferred()
             self._background_update_timer = self._clock.call_later(
                 self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
@@ -101,22 +104,23 @@ class BackgroundUpdateStore(SQLBaseStore):
                 self._background_update_timer = None
 
             try:
-                result = yield self.do_background_update(
+                result = yield self.do_next_background_update(
                     self.BACKGROUND_UPDATE_DURATION_MS
                 )
             except:
                 logger.exception("Error doing update")
-
-            if result is None:
-                logger.info(
-                    "No more background updates to do."
-                    " Unscheduling background update task."
-                )
-                return
+            else:
+                if result is None:
+                    logger.info(
+                        "No more background updates to do."
+                        " Unscheduling background update task."
+                    )
+                    defer.returnValue(None)
 
     @defer.inlineCallbacks
-    def do_background_update(self, desired_duration_ms):
-        """Does some amount of work on a background update
+    def do_next_background_update(self, desired_duration_ms):
+        """Does some amount of work on the next queued background update
+
         Args:
             desired_duration_ms(float): How long we want to spend
                 updating.
@@ -135,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore):
                 self._background_update_queue.append(update['update_name'])
 
         if not self._background_update_queue:
+            # no work left to do
             defer.returnValue(None)
 
+        # pop from the front, and add back to the back
         update_name = self._background_update_queue.pop(0)
         self._background_update_queue.append(update_name)
 
+        res = yield self._do_background_update(update_name, desired_duration_ms)
+        defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def _do_background_update(self, update_name, desired_duration_ms):
+        logger.info("Starting update batch on background update '%s'",
+                    update_name)
+
         update_handler = self._background_update_handlers[update_name]
 
         performance = self._background_update_performance.get(update_name)
@@ -202,6 +216,64 @@ class BackgroundUpdateStore(SQLBaseStore):
         """
         self._background_update_handlers[update_name] = update_handler
 
+    def register_background_index_update(self, update_name, index_name,
+                                         table, columns):
+        """Helper for store classes to do a background index addition
+
+        To use:
+
+        1. use a schema delta file to add a background update. Example:
+            INSERT INTO background_updates (update_name, progress_json) VALUES
+                ('my_new_index', '{}');
+
+        2. In the Store constructor, call this method
+
+        Args:
+            update_name (str): update_name to register for
+            index_name (str): name of index to add
+            table (str): table to add index to
+            columns (list[str]): columns/expressions to include in index
+        """
+
+        # if this is postgres, we add the indexes concurrently. Otherwise
+        # we fall back to doing it inline
+        if isinstance(self.database_engine, engines.PostgresEngine):
+            conc = True
+        else:
+            conc = False
+
+        sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \
+              % {
+                  "conc": "CONCURRENTLY" if conc else "",
+                  "name": index_name,
+                  "table": table,
+                  "columns": ", ".join(columns),
+              }
+
+        def create_index_concurrently(conn):
+            conn.rollback()
+            # postgres insists on autocommit for the index
+            conn.set_session(autocommit=True)
+            c = conn.cursor()
+            c.execute(sql)
+            conn.set_session(autocommit=False)
+
+        def create_index(conn):
+            c = conn.cursor()
+            c.execute(sql)
+
+        @defer.inlineCallbacks
+        def updater(progress, batch_size):
+            logger.info("Adding index %s to %s", index_name, table)
+            if conc:
+                yield self.runWithConnection(create_index_concurrently)
+            else:
+                yield self.runWithConnection(create_index)
+            yield self._end_background_update(update_name)
+            defer.returnValue(1)
+
+        self.register_background_update_handler(update_name, updater)
+
     def start_background_update(self, update_name, progress):
         """Starts a background update running.
 
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index e31fa53c3f..71e5ea112f 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,10 +15,11 @@
 
 import logging
 
-from ._base import SQLBaseStore, Cache
-
 from twisted.internet import defer
 
+from ._base import Cache
+from . import background_updates
+
 logger = logging.getLogger(__name__)
 
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
@@ -27,8 +28,7 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 120 * 1000
 
 
-class ClientIpStore(SQLBaseStore):
-
+class ClientIpStore(background_updates.BackgroundUpdateStore):
     def __init__(self, hs):
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
@@ -37,6 +37,13 @@ class ClientIpStore(SQLBaseStore):
 
         super(ClientIpStore, self).__init__(hs)
 
+        self.register_background_index_update(
+            "user_ips_device_index",
+            index_name="user_ips_device_id",
+            table="user_ips",
+            columns=["user_id", "device_id", "last_seen"],
+        )
+
     @defer.inlineCallbacks
     def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
         now = int(self._clock.time_msec())
diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql
index 8a05677d42..473f75a78e 100644
--- a/synapse/storage/schema/delta/33/user_ips_index.sql
+++ b/synapse/storage/schema/delta/33/user_ips_index.sql
@@ -13,4 +13,5 @@
  * limitations under the License.
  */
 
-CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen);
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('user_ips_device_index', '{}');
diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py
index 6e4d9b1373..1286b4ce2d 100644
--- a/tests/storage/test_background_update.py
+++ b/tests/storage/test_background_update.py
@@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def setUp(self):
-        hs = yield setup_test_homeserver()
+        hs = yield setup_test_homeserver()  # type: synapse.server.HomeServer
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
 
@@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase):
             "test_update", self.update_handler
         )
 
+        # run the real background updates, to get them out the way
+        # (perhaps we should run them as part of the test HS setup, since we
+        # run all of the other schema setup stuff there?)
+        while True:
+            res = yield self.store.do_next_background_update(1000)
+            if res is None:
+                break
+
     @defer.inlineCallbacks
     def test_do_background_update(self):
         desired_count = 1000
         duration_ms = 42
 
+        # first step: make a bit of progress
         @defer.inlineCallbacks
         def update(progress, count):
             self.clock.advance_time_msec(count * duration_ms)
@@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
         yield self.store.start_background_update("test_update", {"my_key": 1})
 
         self.update_handler.reset_mock()
-        result = yield self.store.do_background_update(
+        result = yield self.store.do_next_background_update(
             duration_ms * desired_count
         )
         self.assertIsNotNone(result)
@@ -50,15 +59,15 @@ class BackgroundUpdateTestCase(unittest.TestCase):
             {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
         )
 
+        # second step: complete the update
         @defer.inlineCallbacks
         def update(progress, count):
             yield self.store._end_background_update("test_update")
             defer.returnValue(count)
 
         self.update_handler.side_effect = update
-
         self.update_handler.reset_mock()
-        result = yield self.store.do_background_update(
+        result = yield self.store.do_next_background_update(
             duration_ms * desired_count
         )
         self.assertIsNotNone(result)
@@ -66,8 +75,9 @@ class BackgroundUpdateTestCase(unittest.TestCase):
             {"my_key": 2}, desired_count
         )
 
+        # third step: we don't expect to be called any more
         self.update_handler.reset_mock()
-        result = yield self.store.do_background_update(
+        result = yield self.store.do_next_background_update(
             duration_ms * desired_count
         )
         self.assertIsNone(result)
diff --git a/tests/unittest.py b/tests/unittest.py
index 5b22abfe74..38715972dd 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -17,13 +17,18 @@ from twisted.trial import unittest
 
 import logging
 
-
 # logging doesn't have a "don't log anything at all EVARRRR setting,
 # but since the highest value is 50, 1000000 should do ;)
 NEVER = 1000000
 
-logging.getLogger().addHandler(logging.StreamHandler())
+handler = logging.StreamHandler()
+handler.setFormatter(logging.Formatter(
+    "%(levelname)s:%(name)s:%(message)s  [%(pathname)s:%(lineno)d]"
+))
+logging.getLogger().addHandler(handler)
 logging.getLogger().setLevel(NEVER)
+logging.getLogger("synapse.storage.SQL").setLevel(NEVER)
+logging.getLogger("synapse.storage.txn").setLevel(NEVER)
 
 
 def around(target):
@@ -70,8 +75,6 @@ class TestCase(unittest.TestCase):
                     return ret
 
             logging.getLogger().setLevel(level)
-            # Don't set SQL logging
-            logging.getLogger("synapse.storage").setLevel(old_level)
             return orig()
 
     def assertObjectHasAttributes(self, attrs, obj):