diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4ab0334fc1..e03e77199b 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -19,8 +19,6 @@ import logging
from collections import namedtuple
from typing import Any
-from twisted.internet import defer
-
logger = logging.getLogger(__name__)
@@ -144,8 +142,7 @@ class Stream(object):
self.upto_token = self.current_token()
self.last_token = self.upto_token
- @defer.inlineCallbacks
- def get_updates(self):
+ async def get_updates(self):
"""Gets all updates since the last time this function was called (or
since the stream was constructed if it hadn't been called before),
until the `upto_token`
@@ -156,13 +153,12 @@ class Stream(object):
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
"""
- updates, current_token = yield self.get_updates_since(self.last_token)
+ updates, current_token = await self.get_updates_since(self.last_token)
self.last_token = current_token
return updates, current_token
- @defer.inlineCallbacks
- def get_updates_since(self, from_token):
+ async def get_updates_since(self, from_token):
"""Like get_updates except allows specifying from when we should
stream updates
@@ -182,15 +178,16 @@ class Stream(object):
if from_token == current_token:
return [], current_token
+ logger.info("get_updates_since: %s", self.__class__)
if self._LIMITED:
- rows = yield self.update_function(
+ rows = await self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)
# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
else:
- rows = yield self.update_function(from_token, current_token)
+ rows = await self.update_function(from_token, current_token)
updates = [(row[0], row[1:]) for row in rows]
@@ -295,9 +292,8 @@ class PushRulesStream(Stream):
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token
- @defer.inlineCallbacks
- def update_function(self, from_token, to_token, limit):
- rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
+ async def update_function(self, from_token, to_token, limit):
+ rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
return [(row[0], row[2]) for row in rows]
@@ -413,9 +409,8 @@ class AccountDataStream(Stream):
super(AccountDataStream, self).__init__(hs)
- @defer.inlineCallbacks
- def update_function(self, from_token, to_token, limit):
- global_results, room_results = yield self.store.get_all_updated_account_data(
+ async def update_function(self, from_token, to_token, limit):
+ global_results, room_results = await self.store.get_all_updated_account_data(
from_token, from_token, to_token, limit
)
|