summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/7203.bugfix1
-rw-r--r--synapse/app/generic_worker.py16
2 files changed, 9 insertions, 8 deletions
diff --git a/changelog.d/7203.bugfix b/changelog.d/7203.bugfix
new file mode 100644
index 0000000000..8b383952e5
--- /dev/null
+++ b/changelog.d/7203.bugfix
@@ -0,0 +1 @@
+Fix some worker-mode replication handling not being correctly recorded in CPU usage stats.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1ee266f7c5..174bef360f 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -42,7 +42,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties
 from synapse.http.server import JsonResource
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseSite
-from synapse.logging.context import LoggingContext, run_in_background
+from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
@@ -635,7 +635,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
         await super(GenericWorkerReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
-        run_in_background(self.process_and_notify, stream_name, token, rows)
+        await self.process_and_notify(stream_name, token, rows)
 
     def get_streams_to_replicate(self):
         args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
@@ -650,7 +650,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
     async def process_and_notify(self, stream_name, token, rows):
         try:
             if self.send_handler:
-                self.send_handler.process_replication_rows(stream_name, token, rows)
+                await self.send_handler.process_replication_rows(
+                    stream_name, token, rows
+                )
 
             if stream_name == EventsStream.NAME:
                 # We shouldn't get multiple rows per token for events stream, so
@@ -782,12 +784,12 @@ class FederationSenderHandler(object):
     def stream_positions(self):
         return {"federation": self.federation_position}
 
-    def process_replication_rows(self, stream_name, token, rows):
+    async def process_replication_rows(self, stream_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            run_in_background(self.update_token, token)
+            await self.update_token(token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -795,9 +797,7 @@ class FederationSenderHandler(object):
 
         # ... and when new receipts happen
         elif stream_name == ReceiptsStream.NAME:
-            run_as_background_process(
-                "process_receipts_for_federation", self._on_new_receipts, rows
-            )
+            await self._on_new_receipts(rows)
 
         # ... as well as device updates and messages
         elif stream_name == DeviceListsStream.NAME: