diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 051114596b..aa50492569 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,11 +15,12 @@
# limitations under the License.
import heapq
-from typing import Iterable, Tuple, Type
+from collections import Iterable
+from typing import List, Tuple, Type
import attr
-from ._base import Stream, Token, db_query_to_update_function
+from ._base import Stream, StreamUpdateResult, Token
"""Handling of the 'events' replication stream
@@ -117,30 +118,106 @@ class EventsStream(Stream):
def __init__(self, hs):
self._store = hs.get_datastore()
super().__init__(
- self._store.get_current_events_token,
- db_query_to_update_function(self._update_function),
+ self._store.get_current_events_token, self._update_function,
)
async def _update_function(
- self, from_token: Token, current_token: Token, limit: int
- ) -> Iterable[tuple]:
+ self, from_token: Token, current_token: Token, target_row_count: int
+ ) -> StreamUpdateResult:
+
+ # the events stream merges together three separate sources:
+ # * new events
+ # * current_state changes
+ # * events which were previously outliers, but have now been de-outliered.
+ #
+ # The merge operation is complicated by the fact that we only have a single
+ # "stream token" which is supposed to indicate how far we have got through
+ # all three streams. It's therefore no good to return rows 1-1000 from the
+ # "new events" table if the state_deltas are limited to rows 1-100 by the
+ # target_row_count.
+ #
+ # In other words: we must pick a new upper limit, and must return *all* rows
+ # up to that point for each of the three sources.
+ #
+ # Start by trying to split the target_row_count up. We expect to have a
+ # negligible number of ex-outliers, and a rough approximation based on recent
+ # traffic on sw1v.org shows that there are approximately the same number of
+ # event rows between a given pair of stream ids as there are state
+ # updates, so let's split our target_row_count among those two types. The target
+ # is only an approximation - it doesn't matter if we end up going a bit over it.
+
+ target_row_count //= 2
+
+ # now we fetch up to that many rows from the events table
+
event_rows = await self._store.get_all_new_forward_event_rows(
- from_token, current_token, limit
- )
- event_updates = (
- (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
- )
+ from_token, current_token, target_row_count
+ ) # type: List[Tuple]
+
+ # we rely on get_all_new_forward_event_rows strictly honouring the limit, so
+ # that we know it is safe to just take upper_limit = event_rows[-1][0].
+ assert (
+ len(event_rows) <= target_row_count
+ ), "get_all_new_forward_event_rows did not honour row limit"
+
+ # if we hit the limit on event_updates, there's no point in going beyond the
+ # last stream_id in the batch for the other sources.
+
+ if len(event_rows) == target_row_count:
+ limited = True
+ upper_limit = event_rows[-1][0] # type: int
+ else:
+ limited = False
+ upper_limit = current_token
+
+ # next up is the state delta table
state_rows = await self._store.get_all_updated_current_state_deltas(
- from_token, current_token, limit
- )
- state_updates = (
- (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
- )
+ from_token, upper_limit, target_row_count
+ ) # type: List[Tuple]
+
+ # again, if we've hit the limit there, we'll need to limit the other sources
+ assert len(state_rows) < target_row_count
+ if len(state_rows) == target_row_count:
+ assert state_rows[-1][0] <= upper_limit
+ upper_limit = state_rows[-1][0]
+ limited = True
+
+ # FIXME: is it a given that there is only one row per stream_id in the
+ # state_deltas table (so that we can be sure that we have got all of the
+ # rows for upper_limit)?
+
+ # finally, fetch the ex-outliers rows. We assume there are few enough of these
+ # not to bother with the limit.
- all_updates = heapq.merge(event_updates, state_updates)
+ ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
+ from_token, upper_limit
+ ) # type: List[Tuple]
- return all_updates
+ # we now need to turn the raw database rows returned into tuples suitable
+ # for the replication protocol (basically, we add an identifier to
+ # distinguish the row type). At the same time, we can limit the event_rows
+ # to the max stream_id from state_rows.
+
+ event_updates = (
+ (stream_id, (EventsStreamEventRow.TypeId, rest))
+ for (stream_id, *rest) in event_rows
+ if stream_id <= upper_limit
+ ) # type: Iterable[Tuple[int, Tuple]]
+
+ state_updates = (
+ (stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
+ for (stream_id, *rest) in state_rows
+ ) # type: Iterable[Tuple[int, Tuple]]
+
+ ex_outliers_updates = (
+ (stream_id, (EventsStreamEventRow.TypeId, rest))
+ for (stream_id, *rest) in ex_outliers_rows
+ ) # type: Iterable[Tuple[int, Tuple]]
+
+ # we need to return a sorted list, so merge them together.
+ updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
+ return updates, upper_limit, limited
@classmethod
def parse_row(cls, row):
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index accde349a7..ce8be72bfe 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -973,8 +973,18 @@ class EventsWorkerStore(SQLBaseStore):
return self._stream_id_gen.get_current_token()
def get_all_new_forward_event_rows(self, last_id, current_id, limit):
- if last_id == current_id:
- return defer.succeed([])
+ """Returns new events, for the Events replication stream
+
+ Args:
+ last_id: the last stream_id from the previous batch.
+ current_id: the maximum stream_id to return up to
+ limit: the maximum number of rows to return
+
+ Returns: Deferred[List[Tuple]]
+ a list of events stream rows. Each tuple consists of a stream id as
+ the first element, followed by fields suitable for casting into an
+ EventsStreamRow.
+ """
def get_all_new_forward_event_rows(txn):
sql = (
@@ -989,13 +999,26 @@ class EventsWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
- new_event_updates = txn.fetchall()
+ return txn.fetchall()
- if len(new_event_updates) == limit:
- upper_bound = new_event_updates[-1][0]
- else:
- upper_bound = current_id
+ return self.db.runInteraction(
+ "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+ )
+
+ def get_ex_outlier_stream_rows(self, last_id, current_id):
+ """Returns de-outliered events, for the Events replication stream
+ Args:
+ last_id: the last stream_id from the previous batch.
+ current_id: the maximum stream_id to return up to
+
+ Returns: Deferred[List[Tuple]]
+ a list of events stream rows. Each tuple consists of a stream id as
+ the first element, followed by fields suitable for casting into an
+ EventsStreamRow.
+ """
+
+ def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
@@ -1006,15 +1029,14 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
- " ORDER BY event_stream_ordering DESC"
+ " ORDER BY event_stream_ordering ASC"
)
- txn.execute(sql, (last_id, upper_bound))
- new_event_updates.extend(txn)
- return new_event_updates
+ txn.execute(sql, (last_id, current_id))
+ return txn.fetchall()
return self.db.runInteraction(
- "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+ "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)
def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
|