diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2020-06-24 12:07:41 +0100 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2020-06-24 12:07:41 +0100 |
commit | a7d49db74fdc303bcd295db501644d54846f1fd5 (patch) | |
tree | ec564c03c6b642fb7ea9d830a26156bfd44f0460 /synapse/app | |
parent | Prevent M_USER_IN_USE from being raised by registration methods until after e... (diff) | |
parent | 1.15.0 (diff) | |
download | synapse-a7d49db74fdc303bcd295db501644d54846f1fd5.tar.xz |
Merge branch 'release-v1.15.0' of github.com:matrix-org/synapse into dinsic-release-v1.15.x
* 'release-v1.15.0' of github.com:matrix-org/synapse: (55 commits) 1.15.0 Fix some attributions Update CHANGES.md 1.15.0rc1 Revert "1.15.0rc1" 1.15.0rc1 Fix bug in account data replication stream. (#7656) Convert the registration handler to async/await. (#7649) Accept device information at the login fallback endpoint. (#7629) Convert user directory handler and related classes to async/await. (#7640) Add an option to disable autojoin for guest accounts (#6637) Clarifications to the admin api documentation (#7647) Update to the stable SSO prefix for UI Auth. (#7630) Fix type information on `assert_*_is_admin` methods (#7645) Remove some unused constants. (#7644) Typo fixes. Allow new users to be registered via the admin API even if the monthly active user limit has been reached (#7263) Add device management to admin API (#7481) Attempt to fix PhoneHomeStatsTestCase.test_performance_100 being flaky. (#7634) Support CS API v0.6.0 (#6585) ...
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/generic_worker.py | 31 | ||||
-rw-r--r-- | synapse/app/homeserver.py | 51 |
2 files changed, 51 insertions, 31 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 5afe52f8d4..f3ec2a34ec 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -863,9 +863,24 @@ class FederationSenderHandler(object): a FEDERATION_ACK back to the master, and stores the token that we have processed in `federation_stream_position` so that we can restart where we left off. """ - try: - self.federation_position = token + self.federation_position = token + + # We save and send the ACK to master asynchronously, so we don't block + # processing on persistence. We don't need to do this operation for + # every single RDATA we receive, we just need to do it periodically. + + if self._fed_position_linearizer.is_queued(None): + # There is already a task queued up to save and send the token, so + # no need to queue up another task. + return + + run_as_background_process("_save_and_send_ack", self._save_and_send_ack) + async def _save_and_send_ack(self): + """Save the current federation position in the database and send an ACK + to master with where we're up to. + """ + try: # We linearize here to ensure we don't have races updating the token # # XXX this appears to be redundant, since the ReplicationCommandHandler @@ -875,16 +890,18 @@ class FederationSenderHandler(object): # we're not being re-entered? with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + await self.store.update_federation_out_pos( - "federation", self.federation_position + "federation", current_position ) # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + self._hs.get_tcp_replication().send_federation_ack(current_position) + self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position") diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 93a5ba2100..8454d74858 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -488,6 +488,29 @@ def phone_stats_home(hs, stats, stats_process=_stats_process): if uptime < 0: uptime = 0 + # + # Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test. + # + old = stats_process[0] + new = (now, resource.getrusage(resource.RUSAGE_SELF)) + stats_process[0] = new + + # Get RSS in bytes + stats["memory_rss"] = new[1].ru_maxrss + + # Get CPU time in % of a single core, not % of all cores + used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - ( + old[1].ru_utime + old[1].ru_stime + ) + if used_cpu_time == 0 or new[0] == old[0]: + stats["cpu_average"] = 0 + else: + stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100) + + # + # General statistics + # + stats["homeserver"] = hs.config.server_name stats["server_context"] = hs.config.server_context stats["timestamp"] = now @@ -523,25 +546,6 @@ def phone_stats_home(hs, stats, stats_process=_stats_process): stats["event_cache_size"] = hs.config.caches.event_cache_size # - # Performance statistics - # - old = stats_process[0] - new = (now, resource.getrusage(resource.RUSAGE_SELF)) - stats_process[0] = new - - # Get RSS in bytes - stats["memory_rss"] = new[1].ru_maxrss - - # Get CPU time in % of a single core, not % of all cores - used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - ( - old[1].ru_utime + old[1].ru_stime - ) - if used_cpu_time == 0 or new[0] == old[0]: - stats["cpu_average"] = 0 - else: - stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100) - - # # Database version # @@ -617,18 +621,17 @@ def run(hs): clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) reap_monthly_active_users() - @defer.inlineCallbacks - def generate_monthly_active_users(): + async def generate_monthly_active_users(): current_mau_count = 0 current_mau_count_by_service = {} reserved_users = () store = hs.get_datastore() if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: - current_mau_count = yield store.get_monthly_active_count() + current_mau_count = await store.get_monthly_active_count() current_mau_count_by_service = ( - yield store.get_monthly_active_count_by_service() + await store.get_monthly_active_count_by_service() ) - reserved_users = yield store.get_registered_reserved_users() + reserved_users = await store.get_registered_reserved_users() current_mau_gauge.set(float(current_mau_count)) for app_service, count in current_mau_count_by_service.items(): |