summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-04-24 14:00:29 +0100
committerGitHub <noreply@github.com>2020-04-24 14:00:29 +0100
commit69a1ac00b2a00608368aa0d71fb0490a0501bd16 (patch)
tree7f995f7e96f93e09309b9eb5da65046c024f4df3 /synapse
parentMerge branch 'master' into develop (diff)
parentchangelog (diff)
downloadsynapse-69a1ac00b2a00608368aa0d71fb0490a0501bd16.tar.xz
Merge pull request #7337 from matrix-org/rav/fix_update_limit_assertion
Fix assertions being thrown by the EventsStream update function
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/http/streams.py12
-rw-r--r--synapse/replication/tcp/streams/_base.py19
-rw-r--r--synapse/replication/tcp/streams/events.py113
-rw-r--r--synapse/storage/data_stores/main/events_worker.py46
4 files changed, 146 insertions, 44 deletions
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index ffd4c61993..f35cebc710 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -28,7 +28,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
 
     The API looks like:
 
-        GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100
+        GET /_synapse/replication/get_repl_stream_updates/<stream name>?from_token=0&to_token=10
 
         200 OK
 
@@ -38,6 +38,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
             limited: False,
         }
 
+    If there are more rows than can sensibly be returned in one lump, `limited` will be
+    set to true, and the caller should call again with a new `from_token`.
+
     """
 
     NAME = "get_repl_stream_updates"
@@ -52,8 +55,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         self.streams = hs.get_replication_streamer().get_streams()
 
     @staticmethod
-    def _serialize_payload(stream_name, from_token, upto_token, limit):
-        return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
+    def _serialize_payload(stream_name, from_token, upto_token):
+        return {"from_token": from_token, "upto_token": upto_token}
 
     async def _handle_request(self, request, stream_name):
         stream = self.streams.get(stream_name)
@@ -62,10 +65,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
 
         from_token = parse_integer(request, "from_token", required=True)
         upto_token = parse_integer(request, "upto_token", required=True)
-        limit = parse_integer(request, "limit", required=True)
 
         updates, upto_token, limited = await stream.get_updates_since(
-            from_token, upto_token, limit
+            from_token, upto_token
         )
 
         return (
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index a860072ccf..4ae3cffb1e 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -24,8 +24,8 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
 
 logger = logging.getLogger(__name__)
 
-
-MAX_EVENTS_BEHIND = 500000
+# the number of rows to request from an update_function.
+_STREAM_UPDATE_TARGET_ROW_COUNT = 100
 
 
 # Some type aliases to make things a bit easier.
@@ -56,7 +56,11 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool]
 #  * from_token: the previous stream token: the starting point for fetching the
 #    updates
 #  * to_token: the new stream token: the point to get updates up to
-#  * limit: the maximum number of rows to return
+#  * target_row_count: a target for the number of rows to be returned.
+#
+# The update_function is expected to return up to _approximately_ target_row_count rows.
+# If there are more updates available, it should set `limited` in the result, and
+# it will be called again to get the next batch.
 #
 UpdateFunction = Callable[[Token, Token, int], Awaitable[StreamUpdateResult]]
 
@@ -138,7 +142,7 @@ class Stream(object):
         return updates, current_token, limited
 
     async def get_updates_since(
-        self, from_token: Token, upto_token: Token, limit: int = 100
+        self, from_token: Token, upto_token: Token
     ) -> StreamUpdateResult:
         """Like get_updates except allows specifying from when we should
         stream updates
@@ -156,7 +160,7 @@ class Stream(object):
             return [], upto_token, False
 
         updates, upto_token, limited = await self.update_function(
-            from_token, upto_token, limit,
+            from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT,
         )
         return updates, upto_token, limited
 
@@ -193,10 +197,7 @@ def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
         from_token: int, upto_token: int, limit: int
     ) -> StreamUpdateResult:
         result = await client(
-            stream_name=stream_name,
-            from_token=from_token,
-            upto_token=upto_token,
-            limit=limit,
+            stream_name=stream_name, from_token=from_token, upto_token=upto_token,
         )
         return result["updates"], result["upto_token"], result["limited"]
 
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 051114596b..aa50492569 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,11 +15,12 @@
 # limitations under the License.
 
 import heapq
-from typing import Iterable, Tuple, Type
+from collections import Iterable
+from typing import List, Tuple, Type
 
 import attr
 
-from ._base import Stream, Token, db_query_to_update_function
+from ._base import Stream, StreamUpdateResult, Token
 
 
 """Handling of the 'events' replication stream
@@ -117,30 +118,106 @@ class EventsStream(Stream):
     def __init__(self, hs):
         self._store = hs.get_datastore()
         super().__init__(
-            self._store.get_current_events_token,
-            db_query_to_update_function(self._update_function),
+            self._store.get_current_events_token, self._update_function,
         )
 
     async def _update_function(
-        self, from_token: Token, current_token: Token, limit: int
-    ) -> Iterable[tuple]:
+        self, from_token: Token, current_token: Token, target_row_count: int
+    ) -> StreamUpdateResult:
+
+        # the events stream merges together three separate sources:
+        #  * new events
+        #  * current_state changes
+        #  * events which were previously outliers, but have now been de-outliered.
+        #
+        # The merge operation is complicated by the fact that we only have a single
+        # "stream token" which is supposed to indicate how far we have got through
+        # all three streams. It's therefore no good to return rows 1-1000 from the
+        # "new events" table if the state_deltas are limited to rows 1-100 by the
+        # target_row_count.
+        #
+        # In other words: we must pick a new upper limit, and must return *all* rows
+        # up to that point for each of the three sources.
+        #
+        # Start by trying to split the target_row_count up. We expect to have a
+        # negligible number of ex-outliers, and a rough approximation based on recent
+        # traffic on sw1v.org shows that there are approximately the same number of
+        # event rows between a given pair of stream ids as there are state
+        # updates, so let's split our target_row_count among those two types. The target
+        # is only an approximation - it doesn't matter if we end up going a bit over it.
+
+        target_row_count //= 2
+
+        # now we fetch up to that many rows from the events table
+
         event_rows = await self._store.get_all_new_forward_event_rows(
-            from_token, current_token, limit
-        )
-        event_updates = (
-            (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
-        )
+            from_token, current_token, target_row_count
+        )  # type: List[Tuple]
+
+        # we rely on get_all_new_forward_event_rows strictly honouring the limit, so
+        # that we know it is safe to just take upper_limit = event_rows[-1][0].
+        assert (
+            len(event_rows) <= target_row_count
+        ), "get_all_new_forward_event_rows did not honour row limit"
+
+        # if we hit the limit on event_updates, there's no point in going beyond the
+        # last stream_id in the batch for the other sources.
+
+        if len(event_rows) == target_row_count:
+            limited = True
+            upper_limit = event_rows[-1][0]  # type: int
+        else:
+            limited = False
+            upper_limit = current_token
+
+        # next up is the state delta table
 
         state_rows = await self._store.get_all_updated_current_state_deltas(
-            from_token, current_token, limit
-        )
-        state_updates = (
-            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
-        )
+            from_token, upper_limit, target_row_count
+        )  # type: List[Tuple]
+
+        # again, if we've hit the limit there, we'll need to limit the other sources
+        assert len(state_rows) < target_row_count
+        if len(state_rows) == target_row_count:
+            assert state_rows[-1][0] <= upper_limit
+            upper_limit = state_rows[-1][0]
+            limited = True
+
+            # FIXME: is it a given that there is only one row per stream_id in the
+            # state_deltas table (so that we can be sure that we have got all of the
+            # rows for upper_limit)?
+
+        # finally, fetch the ex-outliers rows. We assume there are few enough of these
+        # not to bother with the limit.
 
-        all_updates = heapq.merge(event_updates, state_updates)
+        ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
+            from_token, upper_limit
+        )  # type: List[Tuple]
 
-        return all_updates
+        # we now need to turn the raw database rows returned into tuples suitable
+        # for the replication protocol (basically, we add an identifier to
+        # distinguish the row type). At the same time, we can limit the event_rows
+        # to the max stream_id from state_rows.
+
+        event_updates = (
+            (stream_id, (EventsStreamEventRow.TypeId, rest))
+            for (stream_id, *rest) in event_rows
+            if stream_id <= upper_limit
+        )  # type: Iterable[Tuple[int, Tuple]]
+
+        state_updates = (
+            (stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
+            for (stream_id, *rest) in state_rows
+        )  # type: Iterable[Tuple[int, Tuple]]
+
+        ex_outliers_updates = (
+            (stream_id, (EventsStreamEventRow.TypeId, rest))
+            for (stream_id, *rest) in ex_outliers_rows
+        )  # type: Iterable[Tuple[int, Tuple]]
+
+        # we need to return a sorted list, so merge them together.
+        updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
+        return updates, upper_limit, limited
 
     @classmethod
     def parse_row(cls, row):
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index accde349a7..ce8be72bfe 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -973,8 +973,18 @@ class EventsWorkerStore(SQLBaseStore):
         return self._stream_id_gen.get_current_token()
 
     def get_all_new_forward_event_rows(self, last_id, current_id, limit):
-        if last_id == current_id:
-            return defer.succeed([])
+        """Returns new events, for the Events replication stream
+
+        Args:
+            last_id: the last stream_id from the previous batch.
+            current_id: the maximum stream_id to return up to
+            limit: the maximum number of rows to return
+
+        Returns: Deferred[List[Tuple]]
+            a list of events stream rows. Each tuple consists of a stream id as
+            the first element, followed by fields suitable for casting into an
+            EventsStreamRow.
+        """
 
         def get_all_new_forward_event_rows(txn):
             sql = (
@@ -989,13 +999,26 @@ class EventsWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (last_id, current_id, limit))
-            new_event_updates = txn.fetchall()
+            return txn.fetchall()
 
-            if len(new_event_updates) == limit:
-                upper_bound = new_event_updates[-1][0]
-            else:
-                upper_bound = current_id
+        return self.db.runInteraction(
+            "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+        )
+
+    def get_ex_outlier_stream_rows(self, last_id, current_id):
+        """Returns de-outliered events, for the Events replication stream
 
+        Args:
+            last_id: the last stream_id from the previous batch.
+            current_id: the maximum stream_id to return up to
+
+        Returns: Deferred[List[Tuple]]
+            a list of events stream rows. Each tuple consists of a stream id as
+            the first element, followed by fields suitable for casting into an
+            EventsStreamRow.
+        """
+
+        def get_ex_outlier_stream_rows_txn(txn):
             sql = (
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " state_key, redacts, relates_to_id"
@@ -1006,15 +1029,14 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < event_stream_ordering"
                 " AND event_stream_ordering <= ?"
-                " ORDER BY event_stream_ordering DESC"
+                " ORDER BY event_stream_ordering ASC"
             )
-            txn.execute(sql, (last_id, upper_bound))
-            new_event_updates.extend(txn)
 
-            return new_event_updates
+            txn.execute(sql, (last_id, current_id))
+            return txn.fetchall()
 
         return self.db.runInteraction(
-            "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+            "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
         )
 
     def get_all_new_backfill_event_rows(self, last_id, current_id, limit):