summary refs log tree commit diff
path: root/synapse/replication/tcp/streams
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r--synapse/replication/tcp/streams/_base.py25
-rw-r--r--synapse/replication/tcp/streams/events.py9
2 files changed, 13 insertions, 21 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4ab0334fc1..e03e77199b 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -19,8 +19,6 @@ import logging
 from collections import namedtuple
 from typing import Any
 
-from twisted.internet import defer
-
 logger = logging.getLogger(__name__)
 
 
@@ -144,8 +142,7 @@ class Stream(object):
         self.upto_token = self.current_token()
         self.last_token = self.upto_token
 
-    @defer.inlineCallbacks
-    def get_updates(self):
+    async def get_updates(self):
         """Gets all updates since the last time this function was called (or
         since the stream was constructed if it hadn't been called before),
         until the `upto_token`
@@ -156,13 +153,12 @@ class Stream(object):
                 list of ``(token, row)`` entries. ``row`` will be json-serialised and
                 sent over the replication steam.
         """
-        updates, current_token = yield self.get_updates_since(self.last_token)
+        updates, current_token = await self.get_updates_since(self.last_token)
         self.last_token = current_token
 
         return updates, current_token
 
-    @defer.inlineCallbacks
-    def get_updates_since(self, from_token):
+    async def get_updates_since(self, from_token):
         """Like get_updates except allows specifying from when we should
         stream updates
 
@@ -182,15 +178,16 @@ class Stream(object):
         if from_token == current_token:
             return [], current_token
 
+        logger.info("get_updates_since: %s", self.__class__)
         if self._LIMITED:
-            rows = yield self.update_function(
+            rows = await self.update_function(
                 from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
             )
 
             # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
             rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
         else:
-            rows = yield self.update_function(from_token, current_token)
+            rows = await self.update_function(from_token, current_token)
 
         updates = [(row[0], row[1:]) for row in rows]
 
@@ -295,9 +292,8 @@ class PushRulesStream(Stream):
         push_rules_token, _ = self.store.get_push_rules_stream_token()
         return push_rules_token
 
-    @defer.inlineCallbacks
-    def update_function(self, from_token, to_token, limit):
-        rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
+    async def update_function(self, from_token, to_token, limit):
+        rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
         return [(row[0], row[2]) for row in rows]
 
 
@@ -413,9 +409,8 @@ class AccountDataStream(Stream):
 
         super(AccountDataStream, self).__init__(hs)
 
-    @defer.inlineCallbacks
-    def update_function(self, from_token, to_token, limit):
-        global_results, room_results = yield self.store.get_all_updated_account_data(
+    async def update_function(self, from_token, to_token, limit):
+        global_results, room_results = await self.store.get_all_updated_account_data(
             from_token, from_token, to_token, limit
         )
 
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 0843e5aa90..b3afabb8cd 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,8 +19,6 @@ from typing import Tuple, Type
 
 import attr
 
-from twisted.internet import defer
-
 from ._base import Stream
 
 
@@ -122,16 +120,15 @@ class EventsStream(Stream):
 
         super(EventsStream, self).__init__(hs)
 
-    @defer.inlineCallbacks
-    def update_function(self, from_token, current_token, limit=None):
-        event_rows = yield self._store.get_all_new_forward_event_rows(
+    async def update_function(self, from_token, current_token, limit=None):
+        event_rows = await self._store.get_all_new_forward_event_rows(
             from_token, current_token, limit
         )
         event_updates = (
             (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
         )
 
-        state_rows = yield self._store.get_all_updated_current_state_deltas(
+        state_rows = await self._store.get_all_updated_current_state_deltas(
             from_token, current_token, limit
         )
         state_updates = (