summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-20 15:11:54 +0000
committerErik Johnston <erik@matrix.org>2020-03-20 15:31:45 +0000
commit811d2ecf2ed50613d2f8a0231c4b9487be2ff925 (patch)
tree99e30df6353a5245735b5043fd24ff69bdcc5bb2 /synapse/replication/tcp
parentMove stream fetch DB queries to worker stores. (diff)
downloadsynapse-811d2ecf2ed50613d2f8a0231c4b9487be2ff925.tar.xz
Don't panic if streams get behind.
The catchup will in future happen on workers, so master process won't
need to protect itself by dropping the connection.
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/protocol.py22
-rw-r--r--synapse/replication/tcp/resource.py5
-rw-r--r--synapse/replication/tcp/streams/_base.py61
3 files changed, 42 insertions, 46 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index bc1482a9bb..d7ef2398fa 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -485,15 +485,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.connecting_streams.add(stream_name)
 
         try:
-            # Get missing updates
-            updates, current_token = await self.streamer.get_stream_updates(
-                stream_name, token
-            )
-
-            # Send all the missing updates
-            for update in updates:
-                token, row = update[0], update[1]
-                self.send_command(RdataCommand(stream_name, token, row))
+            limited = True
+            while limited:
+                # Get missing updates
+                (
+                    updates,
+                    current_token,
+                    limited,
+                ) = await self.streamer.get_stream_updates(stream_name, token)
+
+                # Send all the missing updates
+                for update in updates:
+                    token, row = update[0], update[1]
+                    self.send_command(RdataCommand(stream_name, token, row))
 
             # We send a POSITION command to ensure that they have an up to
             # date token (especially useful if we didn't send any updates
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 6e2ebaf614..5be31024b7 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -190,7 +190,8 @@ class ReplicationStreamer(object):
                             stream.current_token(),
                         )
                         try:
-                            updates, current_token = await stream.get_updates()
+                            updates, current_token, limited = await stream.get_updates()
+                            self.pending_updates |= limited
                         except Exception:
                             logger.info("Failed to handle stream %s", stream.NAME)
                             raise
@@ -235,7 +236,7 @@ class ReplicationStreamer(object):
         if not stream:
             raise Exception("unknown stream %s", stream_name)
 
-        return await stream.get_updates_since(token)
+        return await stream.get_updates_since(token, stream.current_token())
 
     @measure_func("repl.federation_ack")
     def federation_ack(self, token):
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index abf5c6c6a8..99cef97532 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -14,10 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import itertools
 import logging
 from collections import namedtuple
-from typing import Any, List, Optional, Tuple
+from typing import Any, List, Optional, Tuple, Union
 
 import attr
 
@@ -153,61 +152,53 @@ class Stream(object):
         """
         self.last_token = self.current_token()
 
-    async def get_updates(self):
+    async def get_updates(self) -> Tuple[List[Tuple[int, JsonDict]], int, bool]:
         """Gets all updates since the last time this function was called (or
         since the stream was constructed if it hadn't been called before).
 
         Returns:
-            Deferred[Tuple[List[Tuple[int, Any]], int]:
-                Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
-                list of ``(token, row)`` entries. ``row`` will be json-serialised and
-                sent over the replication steam.
+            Resolves to a pair `(updates, new_last_token, limited)`, where
+            `updates` is a list of `(token, row)` entries, `new_last_token` is
+            the new position in stream, and `limited` is whether there are
+            more updates to fetch.
         """
-        updates, current_token = await self.get_updates_since(self.last_token)
+        current_token = self.current_token()
+        updates, current_token, limited = await self.get_updates_since(
+            self.last_token, current_token
+        )
         self.last_token = current_token
 
-        return updates, current_token
+        return updates, current_token, limited
 
     async def get_updates_since(
-        self, from_token: int
-    ) -> Tuple[List[Tuple[int, JsonDict]], int]:
+        self, from_token: Union[int, str], upto_token: int, limit: int = 100
+    ) -> Tuple[List[Tuple[int, JsonDict]], int, bool]:
         """Like get_updates except allows specifying from when we should
         stream updates
 
         Returns:
-            Resolves to a pair `(updates, new_last_token)`, where `updates` is
-            a list of `(token, row)` entries and `new_last_token` is the new
-            position in stream.
+            Resolves to a pair `(updates, new_last_token, limited)`, where
+            `updates` is a list of `(token, row)` entries, `new_last_token` is
+            the new position in stream, and `limited` is whether there are
+            more updates to fetch.
         """
 
         if from_token in ("NOW", "now"):
-            return [], self.current_token()
-
-        current_token = self.current_token()
+            return [], upto_token, False
 
         from_token = int(from_token)
 
-        if from_token == current_token:
-            return [], current_token
-
-        rows = await self.update_function(
-            from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
-        )
-
-        # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
-        rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
+        if from_token == upto_token:
+            return [], upto_token, False
 
+        limited = False
+        rows = await self.update_function(from_token, upto_token, limit=limit)
         updates = [(row[0], row[1:]) for row in rows]
+        if len(updates) == limit:
+            upto_token = rows[-1][0]
+            limited = True
 
-        # check we didn't get more rows than the limit.
-        # doing it like this allows the update_function to be a generator.
-        if len(updates) >= MAX_EVENTS_BEHIND:
-            raise Exception("stream %s has fallen behind" % (self.NAME))
-
-        # The update function didn't hit the limit, so we must have got all
-        # the updates to `current_token`, and can return that as our new
-        # stream position.
-        return updates, current_token
+        return updates, upto_token, limited
 
     def current_token(self):
         """Gets the current token of the underlying streams. Should be provided