diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/background_updates.py | 98 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 15 | ||||
-rw-r--r-- | synapse/storage/schema/delta/33/user_ips_index.sql | 3 |
3 files changed, 98 insertions, 18 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 66a995157d..2771f7c3c1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from . import engines from twisted.internet import defer @@ -87,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - while True: - if self._background_update_timer is not None: - return + assert(self._background_update_timer is not None, + "background updates already running") + + logger.info("Starting background schema updates") + while True: sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None @@ -101,22 +104,23 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_timer = None try: - result = yield self.do_background_update( + result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except: logger.exception("Error doing update") - - if result is None: - logger.info( - "No more background updates to do." - " Unscheduling background update task." - ) - return + else: + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + defer.returnValue() @defer.inlineCallbacks - def do_background_update(self, desired_duration_ms): - """Does some amount of work on a background update + def do_next_background_update(self, desired_duration_ms): + """Does some amount of work on the next queued background update + Args: desired_duration_ms(float): How long we want to spend updating. @@ -135,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue.append(update['update_name']) if not self._background_update_queue: + # no work left to do defer.returnValue(None) + # pop from the front, and add back to the back update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) + res = yield self._do_background_update(update_name, desired_duration_ms) + defer.returnValue(res) + + @defer.inlineCallbacks + def _do_background_update(self, update_name, desired_duration_ms): + logger.info("Starting update batch on background update '%s'", + update_name) + update_handler = self._background_update_handlers[update_name] performance = self._background_update_performance.get(update_name) @@ -202,6 +216,64 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_background_index_update(self, update_name, index_name, + table, columns): + """Helper for store classes to do a background index addition + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('my_new_index', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name (str): update_name to register for + index_name (str): name of index to add + table (str): table to add index to + columns (list[str]): columns/expressions to include in index + """ + + # if this is postgres, we add the indexes concurrently. Otherwise + # we fall back to doing it inline + if isinstance(self.database_engine, engines.PostgresEngine): + conc = True + else: + conc = False + + sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ + % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + + def create_index_concurrently(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute(sql) + conn.set_session(autocommit=False) + + def create_index(conn): + c = conn.cursor() + c.execute(sql) + + @defer.inlineCallbacks + def updater(progress, batch_size): + logger.info("Adding index %s to %s", index_name, table) + if conc: + yield self.runWithConnection(create_index_concurrently) + else: + yield self.runWithConnection(create_index) + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, updater) + def start_background_update(self, update_name, progress): """Starts a background update running. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index e31fa53c3f..71e5ea112f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,10 +15,11 @@ import logging -from ._base import SQLBaseStore, Cache - from twisted.internet import defer +from ._base import Cache +from . import background_updates + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -27,8 +28,7 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(SQLBaseStore): - +class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", @@ -37,6 +37,13 @@ class ClientIpStore(SQLBaseStore): super(ClientIpStore, self).__init__(hs) + self.register_background_index_update( + "user_ips_device_index", + index_name="user_ips_device_id", + table="user_ips", + columns=["user_id", "device_id", "last_seen"], + ) + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql index 8a05677d42..473f75a78e 100644 --- a/synapse/storage/schema/delta/33/user_ips_index.sql +++ b/synapse/storage/schema/delta/33/user_ips_index.sql @@ -13,4 +13,5 @@ * limitations under the License. */ -CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen); +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_device_index', '{}'); |