diff options
Diffstat (limited to '')
-rwxr-xr-x | synapse/app/homeserver.py | 1 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 57 | ||||
-rw-r--r-- | synapse/storage/search.py | 11 | ||||
-rw-r--r-- | synapse/util/__init__.py | 8 |
4 files changed, 67 insertions, 10 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a77535a4ee..cd7a52ec07 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -439,6 +439,7 @@ def setup(config_options): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() + hs.get_datastore().start_doing_background_updates() hs.get_replication_layer().start_get_pdu_cache() return hs diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 32a233c213..b6cdc6ec68 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -43,7 +43,7 @@ class BackgroundUpdatePerformance(object): self.avg_item_count += 0.1 * (item_count - self.avg_item_count) self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms) - def average_duration_ms_per_item(self): + def average_items_per_ms(self): """An estimate of how long it takes to do a single update. Returns: A duration in ms as a float @@ -53,7 +53,17 @@ class BackgroundUpdatePerformance(object): else: # Use the exponential moving average so that we can adapt to # changes in how long the update process takes. - return float(self.avg_duration_ms) / float(self.avg_item_count) + return float(self.avg_item_count) / float(self.avg_duration_ms) + + def total_items_per_ms(self): + """An estimate of how long it takes to do a single update. + Returns: + A duration in ms as a float + """ + if self.total_item_count == 0: + return None + else: + return float(self.total_item_count) / float(self.total_duration_ms) class BackgroundUpdateStore(SQLBaseStore): @@ -65,12 +75,41 @@ class BackgroundUpdateStore(SQLBaseStore): MINIMUM_BACKGROUND_BATCH_SIZE = 100 DEFAULT_BACKGROUND_BATCH_SIZE = 100 + BACKGROUND_UPDATE_INTERVAL_MS = 1000 + BACKGROUND_UPDATE_DURATION_MS = 100 def __init__(self, hs): super(BackgroundUpdateStore, self).__init__(hs) self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} + self._background_update_timer = None + + @defer.inlineCallbacks + def start_doing_background_updates(self): + while True: + if self._background_update_timer is not None: + return + + sleep = defer.Deferred() + self._background_update_timer = self._clock.call_later( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback + ) + try: + yield sleep + finally: + self._background_update_timer = None + + result = yield self.do_background_update( + self.BACKGROUND_UPDATE_DURATION_MS + ) + + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + return @defer.inlineCallbacks def do_background_update(self, desired_duration_ms): @@ -106,10 +145,10 @@ class BackgroundUpdateStore(SQLBaseStore): performance = BackgroundUpdatePerformance(update_name) self._background_update_performance[update_name] = performance - duration_ms_per_item = performance.average_duration_ms_per_item() + items_per_ms = performance.average_items_per_ms() - if duration_ms_per_item is not None: - batch_size = int(desired_duration_ms / duration_ms_per_item) + if items_per_ms is not None: + batch_size = int(desired_duration_ms * items_per_ms) # Clamp the batch size so that we always make progress batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) else: @@ -130,8 +169,12 @@ class BackgroundUpdateStore(SQLBaseStore): duration_ms = time_stop - time_start logger.info( - "Updating %. Updated %r items in %rms", - update_name, items_updated, duration_ms + "Updating %. Updated %r items in %rms." + " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", + update_name, items_updated, duration_ms, + performance.total_items_per_ms(), + performance.average_items_per_ms(), + performance.total_item_count, ) performance.update(items_updated, duration_ms) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f7c269865d..d170c546b5 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -29,6 +29,11 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" + def __init__(self, hs): + super(SearchStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -74,7 +79,7 @@ class SearchStore(BackgroundUpdateStore): elif event.type == "m.room.name": key = "content.name" value = content["name"] - except Exception: + except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. continue @@ -96,7 +101,7 @@ class SearchStore(BackgroundUpdateStore): raise Exception("Unrecognized database engine") for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): - clump = event_search_rows[index : index + INSERT_CLUMP_SIZE) + clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] txn.execute_many(sql, clump) progress = { @@ -116,7 +121,7 @@ class SearchStore(BackgroundUpdateStore): ) if result is None: - yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME) defer.returnValue(result) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 1d123ccefc..d69c7cb991 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -53,6 +53,14 @@ class Clock(object): loop.stop() def call_later(self, delay, callback, *args, **kwargs): + """Call something later + + Args: + delay(float): How long to wait in seconds. + callback(function): Function to call + *args: Postional arguments to pass to function. + **kwargs: Key arguments to pass to function. + """ current_context = LoggingContext.current_context() def wrapped_callback(*args, **kwargs): |