diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 7ef67a5a73..c10b85d2ff 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -158,7 +158,7 @@ class Stream(object):
updates, current_token = yield self.get_updates_since(self.last_token)
self.last_token = current_token
- defer.returnValue((updates, current_token))
+ return (updates, current_token)
@defer.inlineCallbacks
def get_updates_since(self, from_token):
@@ -172,14 +172,14 @@ class Stream(object):
sent over the replication steam.
"""
if from_token in ("NOW", "now"):
- defer.returnValue(([], self.upto_token))
+ return ([], self.upto_token)
current_token = self.upto_token
from_token = int(from_token)
if from_token == current_token:
- defer.returnValue(([], current_token))
+ return ([], current_token)
if self._LIMITED:
rows = yield self.update_function(
@@ -198,7 +198,7 @@ class Stream(object):
if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behind" % (self.NAME))
- defer.returnValue((updates, current_token))
+ return (updates, current_token)
def current_token(self):
"""Gets the current token of the underlying streams. Should be provided
@@ -297,7 +297,7 @@ class PushRulesStream(Stream):
@defer.inlineCallbacks
def update_function(self, from_token, to_token, limit):
rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
- defer.returnValue([(row[0], row[2]) for row in rows])
+ return [(row[0], row[2]) for row in rows]
class PushersStream(Stream):
@@ -424,7 +424,7 @@ class AccountDataStream(Stream):
for stream_id, user_id, account_data_type, content in global_results
)
- defer.returnValue(results)
+ return results
class GroupServerStream(Stream):
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 3d0694bb11..d97669c886 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -134,7 +134,7 @@ class EventsStream(Stream):
all_updates = heapq.merge(event_updates, state_updates)
- defer.returnValue(all_updates)
+ return all_updates
@classmethod
def parse_row(cls, row):
|