diff options
-rw-r--r-- | changelog.d/7203.bugfix | 1 | ||||
-rw-r--r-- | synapse/app/generic_worker.py | 16 |
2 files changed, 9 insertions, 8 deletions
diff --git a/changelog.d/7203.bugfix b/changelog.d/7203.bugfix new file mode 100644 index 0000000000..8b383952e5 --- /dev/null +++ b/changelog.d/7203.bugfix @@ -0,0 +1 @@ +Fix some worker-mode replication handling not being correctly recorded in CPU usage stats. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1ee266f7c5..174bef360f 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -42,7 +42,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext, run_in_background +from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ @@ -635,7 +635,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): await super(GenericWorkerReplicationHandler, self).on_rdata( stream_name, token, rows ) - run_in_background(self.process_and_notify, stream_name, token, rows) + await self.process_and_notify(stream_name, token, rows) def get_streams_to_replicate(self): args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate() @@ -650,7 +650,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): async def process_and_notify(self, stream_name, token, rows): try: if self.send_handler: - self.send_handler.process_replication_rows(stream_name, token, rows) + await self.send_handler.process_replication_rows( + stream_name, token, rows + ) if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so @@ -782,12 +784,12 @@ class FederationSenderHandler(object): def stream_positions(self): return {"federation": self.federation_position} - def process_replication_rows(self, stream_name, token, rows): + async def process_replication_rows(self, stream_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - run_in_background(self.update_token, token) + await self.update_token(token) # We also need to poke the federation sender when new events happen elif stream_name == "events": @@ -795,9 +797,7 @@ class FederationSenderHandler(object): # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: - run_as_background_process( - "process_receipts_for_federation", self._on_new_receipts, rows - ) + await self._on_new_receipts(rows) # ... as well as device updates and messages elif stream_name == DeviceListsStream.NAME: |