summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-20 14:40:47 +0000
committerGitHub <noreply@github.com>2020-03-20 14:40:47 +0000
commitfdb13447167da0670dd6ad95fdf4a99cde450eb9 (patch)
tree4e65824624e802bdaf04f3e289ec348fd4f3ab55 /synapse/replication/tcp/resource.py
parentConvert some of the media REST code to async/await (#7110) (diff)
downloadsynapse-fdb13447167da0670dd6ad95fdf4a99cde450eb9.tar.xz
Remove concept of a non-limited stream. (#7011)
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py9
1 files changed, 2 insertions, 7 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index ce9d1fae12..6e2ebaf614 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -166,11 +166,6 @@ class ReplicationStreamer(object):
                 self.pending_updates = False
 
                 with Measure(self.clock, "repl.stream.get_updates"):
-                    # First we tell the streams that they should update their
-                    # current tokens.
-                    for stream in self.streams:
-                        stream.advance_current_token()
-
                     all_streams = self.streams
 
                     if self._replication_torture_level is not None:
@@ -180,7 +175,7 @@ class ReplicationStreamer(object):
                         random.shuffle(all_streams)
 
                     for stream in all_streams:
-                        if stream.last_token == stream.upto_token:
+                        if stream.last_token == stream.current_token():
                             continue
 
                         if self._replication_torture_level:
@@ -192,7 +187,7 @@ class ReplicationStreamer(object):
                             "Getting stream: %s: %s -> %s",
                             stream.NAME,
                             stream.last_token,
-                            stream.upto_token,
+                            stream.current_token(),
                         )
                         try:
                             updates, current_token = await stream.get_updates()