summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-16 09:16:12 +0000
committerGitHub <noreply@github.com>2020-01-16 09:16:12 +0000
commit48c3a96886de64f3141ad68b8163cd2fc0c197ff (patch)
treee8d6629579e6f7fba216e31bba04cf05781d258c /synapse/replication/tcp/resource.py
parentFix purge_room admin API (#6711) (diff)
downloadsynapse-48c3a96886de64f3141ad68b8163cd2fc0c197ff.tar.xz
Port synapse.replication.tcp to async/await (#6666)
* Port synapse.replication.tcp to async/await

* Newsfile

* Correctly document type of on_<FOO> functions as async

* Don't be overenthusiastic with the asyncing....
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py31
1 files changed, 14 insertions, 17 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index cbfdaf5773..b1752e88cd 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -23,7 +23,6 @@ from six import itervalues
 
 from prometheus_client import Counter
 
-from twisted.internet import defer
 from twisted.internet.protocol import Factory
 
 from synapse.metrics import LaterGauge
@@ -155,8 +154,7 @@ class ReplicationStreamer(object):
 
         run_as_background_process("replication_notifier", self._run_notifier_loop)
 
-    @defer.inlineCallbacks
-    def _run_notifier_loop(self):
+    async def _run_notifier_loop(self):
         self.is_looping = True
 
         try:
@@ -185,7 +183,7 @@ class ReplicationStreamer(object):
                             continue
 
                         if self._replication_torture_level:
-                            yield self.clock.sleep(
+                            await self.clock.sleep(
                                 self._replication_torture_level / 1000.0
                             )
 
@@ -196,7 +194,7 @@ class ReplicationStreamer(object):
                             stream.upto_token,
                         )
                         try:
-                            updates, current_token = yield stream.get_updates()
+                            updates, current_token = await stream.get_updates()
                         except Exception:
                             logger.info("Failed to handle stream %s", stream.NAME)
                             raise
@@ -233,7 +231,7 @@ class ReplicationStreamer(object):
             self.is_looping = False
 
     @measure_func("repl.get_stream_updates")
-    def get_stream_updates(self, stream_name, token):
+    async def get_stream_updates(self, stream_name, token):
         """For a given stream get all updates since token. This is called when
         a client first subscribes to a stream.
         """
@@ -241,7 +239,7 @@ class ReplicationStreamer(object):
         if not stream:
             raise Exception("unknown stream %s", stream_name)
 
-        return stream.get_updates_since(token)
+        return await stream.get_updates_since(token)
 
     @measure_func("repl.federation_ack")
     def federation_ack(self, token):
@@ -252,22 +250,20 @@ class ReplicationStreamer(object):
             self.federation_sender.federation_ack(token)
 
     @measure_func("repl.on_user_sync")
-    @defer.inlineCallbacks
-    def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
+    async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
         """A client has started/stopped syncing on a worker.
         """
         user_sync_counter.inc()
-        yield self.presence_handler.update_external_syncs_row(
+        await self.presence_handler.update_external_syncs_row(
             conn_id, user_id, is_syncing, last_sync_ms
         )
 
     @measure_func("repl.on_remove_pusher")
-    @defer.inlineCallbacks
-    def on_remove_pusher(self, app_id, push_key, user_id):
+    async def on_remove_pusher(self, app_id, push_key, user_id):
         """A client has asked us to remove a pusher
         """
         remove_pusher_counter.inc()
-        yield self.store.delete_pusher_by_app_id_pushkey_user_id(
+        await self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id=app_id, pushkey=push_key, user_id=user_id
         )
 
@@ -281,15 +277,16 @@ class ReplicationStreamer(object):
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
     @measure_func("repl.on_user_ip")
-    @defer.inlineCallbacks
-    def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+    async def on_user_ip(
+        self, user_id, access_token, ip, user_agent, device_id, last_seen
+    ):
         """The client saw a user request
         """
         user_ip_cache_counter.inc()
-        yield self.store.insert_client_ip(
+        await self.store.insert_client_ip(
             user_id, access_token, ip, user_agent, device_id, last_seen
         )
-        yield self._server_notices_sender.on_user_ip(user_id)
+        await self._server_notices_sender.on_user_ip(user_id)
 
     def send_sync_to_all_connections(self, data):
         """Sends a SYNC command to all clients.