diff --git a/changelog.d/11114.bugfix b/changelog.d/11114.bugfix
new file mode 100644
index 0000000000..c6e65df97f
--- /dev/null
+++ b/changelog.d/11114.bugfix
@@ -0,0 +1 @@
+Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a37ae0ca09..c0f642005f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -166,9 +166,14 @@ class FederationHandler:
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
- insertion_events_to_be_backfilled = (
- await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
- )
+
+ insertion_events_to_be_backfilled: Dict[str, int] = {}
+ if self.hs.config.experimental.msc2716_enabled:
+ insertion_events_to_be_backfilled = (
+ await self.store.get_insertion_event_backward_extremities_in_room(
+ room_id
+ )
+ )
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
@@ -271,11 +276,12 @@ class FederationHandler:
]
logger.debug(
- "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
+ "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
+ len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)
@@ -1047,6 +1053,19 @@ class FederationHandler:
limit = min(limit, 100)
events = await self.store.get_backfill_events(room_id, pdu_list, limit)
+ logger.debug(
+ "on_backfill_request: backfill events=%s",
+ [
+ "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ % (
+ event.event_id,
+ event.depth,
+ event.content.get("body", event.type),
+ event.prev_event_ids(),
+ )
+ for event in events
+ ],
+ )
events = await filter_events_for_server(self.storage, origin, events)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3905f60b3a..9edc7369d6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -508,7 +508,11 @@ class FederationEventHandler:
f"room {ev.room_id}, when we were backfilling in {room_id}"
)
- await self._process_pulled_events(dest, events, backfilled=True)
+ await self._process_pulled_events(
+ dest,
+ events,
+ backfilled=True,
+ )
async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
@@ -626,11 +630,24 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
+ logger.debug(
+ "processing pulled backfilled=%s events=%s",
+ backfilled,
+ [
+ "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ % (
+ event.event_id,
+ event.depth,
+ event.content.get("body", event.type),
+ event.prev_event_ids(),
+ )
+ for event in events
+ ],
+ )
# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
-
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -992,6 +1009,8 @@ class FederationEventHandler:
await self._run_push_actions_and_persist_event(event, context, backfilled)
+ await self._handle_marker_event(origin, event)
+
if backfilled or context.rejected:
return
@@ -1071,8 +1090,6 @@ class FederationEventHandler:
event.sender,
)
- await self._handle_marker_event(origin, event)
-
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
@@ -1323,7 +1340,14 @@ class FederationEventHandler:
return event, context
events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
- await self.persist_events_and_notify(room_id, tuple(events_to_persist))
+ await self.persist_events_and_notify(
+ room_id,
+ tuple(events_to_persist),
+ # Mark these events backfilled as they're historic events that will
+ # eventually be backfilled. For example, missing events we fetch
+ # during backfill should be marked as backfilled as well.
+ backfilled=True,
+ )
async def _check_event_auth(
self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b37250aa38..9267e586a8 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -490,12 +490,12 @@ class EventCreationHandler:
requester: Requester,
event_dict: dict,
txn_id: Optional[str] = None,
+ allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
- allow_no_prev_events: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
@@ -510,6 +510,10 @@ class EventCreationHandler:
requester
event_dict: An entire event
txn_id
+ allow_no_prev_events: Whether to allow this event to be created an empty
+ list of prev_events. Normally this is prohibited just because most
+ events should have a prev_event and we should only use this in special
+ cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@@ -604,10 +608,10 @@ class EventCreationHandler:
event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
+ allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
- allow_no_prev_events=allow_no_prev_events,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -764,6 +768,7 @@ class EventCreationHandler:
self,
requester: Requester,
event_dict: dict,
+ allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
@@ -781,6 +786,10 @@ class EventCreationHandler:
Args:
requester: The requester sending the event.
event_dict: An entire event.
+ allow_no_prev_events: Whether to allow this event to be created an empty
+ list of prev_events. Normally this is prohibited just because most
+ events should have a prev_event and we should only use this in special
+ cases like MSC2716.
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
@@ -880,16 +889,20 @@ class EventCreationHandler:
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
+ allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
- allow_no_prev_events: bool = False,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
Args:
builder:
requester:
+ allow_no_prev_events: Whether to allow this event to be created an empty
+ list of prev_events. Normally this is prohibited just because most
+ events should have a prev_event and we should only use this in special
+ cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@@ -908,7 +921,6 @@ class EventCreationHandler:
Returns:
Tuple of created event, context
"""
-
# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
full_state_ids_at_event = None
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index f880aa93d2..f8137ec04c 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -13,10 +13,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-def generate_fake_event_id() -> str:
- return "$fake_" + random_string(43)
-
-
class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
@@ -182,11 +178,12 @@ class RoomBatchHandler:
state_event_ids_at_start = []
auth_event_ids = initial_auth_event_ids.copy()
- # Make the state events float off on their own so we don't have a
- # bunch of `@mxid joined the room` noise between each batch
- prev_event_id_for_state_chain = generate_fake_event_id()
+ # Make the state events float off on their own by specifying no
+ # prev_events for the first one in the chain so we don't have a bunch of
+ # `@mxid joined the room` noise between each batch.
+ prev_event_ids_for_state_chain: List[str] = []
- for state_event in state_events_at_start:
+ for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
)
@@ -222,7 +219,10 @@ class RoomBatchHandler:
content=event_dict["content"],
outlier=True,
historical=True,
- prev_event_ids=[prev_event_id_for_state_chain],
+ # Only the first event in the chain should be floating.
+ # The rest should hang off each other in a chain.
+ allow_no_prev_events=index == 0,
+ prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
@@ -242,7 +242,10 @@ class RoomBatchHandler:
event_dict,
outlier=True,
historical=True,
- prev_event_ids=[prev_event_id_for_state_chain],
+ # Only the first event in the chain should be floating.
+ # The rest should hang off each other in a chain.
+ allow_no_prev_events=index == 0,
+ prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
@@ -253,7 +256,7 @@ class RoomBatchHandler:
state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
# Connect all the state in a floating chain
- prev_event_id_for_state_chain = event_id
+ prev_event_ids_for_state_chain = [event_id]
return state_event_ids_at_start
@@ -261,7 +264,6 @@ class RoomBatchHandler:
self,
events_to_create: List[JsonDict],
room_id: str,
- initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
@@ -277,9 +279,6 @@ class RoomBatchHandler:
events_to_create: List of historical events to create in JSON
dictionary format.
room_id: Room where you want the events persisted in.
- initial_prev_event_ids: These will be the prev_events for the first
- event created. Each event created afterwards will point to the
- previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
@@ -291,11 +290,14 @@ class RoomBatchHandler:
"""
assert app_service_requester.app_service
- prev_event_ids = initial_prev_event_ids.copy()
+ # Make the historical event chain float off on its own by specifying no
+ # prev_events for the first event in the chain which causes the HS to
+ # ask for the state at the start of the batch later.
+ prev_event_ids: List[str] = []
event_ids = []
events_to_persist = []
- for ev in events_to_create:
+ for index, ev in enumerate(events_to_create):
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
@@ -319,6 +321,9 @@ class RoomBatchHandler:
ev["sender"], app_service_requester.app_service
),
event_dict,
+ # Only the first event in the chain should be floating.
+ # The rest should hang off each other in a chain.
+ allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
auth_event_ids=auth_event_ids,
historical=True,
@@ -370,7 +375,6 @@ class RoomBatchHandler:
events_to_create: List[JsonDict],
room_id: str,
batch_id_to_connect_to: str,
- initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
@@ -385,9 +389,6 @@ class RoomBatchHandler:
room_id: Room where you want the events created in.
batch_id_to_connect_to: The batch_id from the insertion event you
want this batch to connect to.
- initial_prev_event_ids: These will be the prev_events for the first
- event created. Each event created afterwards will point to the
- previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
@@ -436,7 +437,6 @@ class RoomBatchHandler:
event_ids = await self.persist_historical_events(
events_to_create=events_to_create,
room_id=room_id,
- initial_prev_event_ids=initial_prev_event_ids,
inherited_depth=inherited_depth,
auth_event_ids=auth_event_ids,
app_service_requester=app_service_requester,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index efe6b4c9aa..bf1a47efb0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -268,7 +268,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
target: UserID,
room_id: str,
membership: str,
- prev_event_ids: List[str],
+ allow_no_prev_events: bool = False,
+ prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
txn_id: Optional[str] = None,
ratelimit: bool = True,
@@ -286,8 +287,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
target:
room_id:
membership:
- prev_event_ids: The event IDs to use as the prev events
+ allow_no_prev_events: Whether to allow this event to be created an empty
+ list of prev_events. Normally this is prohibited just because most
+ events should have a prev_event and we should only use this in special
+ cases like MSC2716.
+ prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
@@ -344,6 +349,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"membership": membership,
},
txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
require_consent=require_consent,
@@ -446,6 +452,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
+ allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
@@ -470,6 +477,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
+ allow_no_prev_events: Whether to allow this event to be created an empty
+ list of prev_events. Normally this is prohibited just because most
+ events should have a prev_event and we should only use this in special
+ cases like MSC2716.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
@@ -504,6 +515,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent=require_consent,
outlier=outlier,
historical=historical,
+ allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
)
@@ -525,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
+ allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
@@ -551,6 +564,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
+ allow_no_prev_events: Whether to allow this event to be created an empty
+ list of prev_events. Normally this is prohibited just because most
+ events should have a prev_event and we should only use this in special
+ cases like MSC2716.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
@@ -680,6 +697,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
+ allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
content=content,
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index e4c9451ae0..4b6be38327 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -131,6 +131,14 @@ class RoomBatchSendEventRestServlet(RestServlet):
prev_event_ids_from_query
)
+ if not auth_event_ids:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "No auth events found for given prev_event query parameter. The prev_event=%s probably does not exist."
+ % prev_event_ids_from_query,
+ errcode=Codes.INVALID_PARAM,
+ )
+
state_event_ids_at_start = []
# Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member
@@ -197,21 +205,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
EventContentFields.MSC2716_NEXT_BATCH_ID
]
- # Also connect the historical event chain to the end of the floating
- # state chain, which causes the HS to ask for the state at the start of
- # the batch later. If there is no state chain to connect to, just make
- # the insertion event float itself.
- prev_event_ids = []
- if len(state_event_ids_at_start):
- prev_event_ids = [state_event_ids_at_start[-1]]
-
# Create and persist all of the historical events as well as insertion
# and batch meta events to make the batch navigable in the DAG.
event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
events_to_create=events_to_create,
room_id=room_id,
batch_id_to_connect_to=batch_id_to_connect_to,
- initial_prev_event_ids=prev_event_ids,
inherited_depth=inherited_depth,
auth_event_ids=auth_event_ids,
app_service_requester=requester,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ca71f073fc..22f6474127 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,9 +16,10 @@ import logging
from queue import Empty, PriorityQueue
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
+import attr
from prometheus_client import Counter, Gauge
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
@@ -60,6 +61,15 @@ pdus_pruned_from_federation_queue = Counter(
logger = logging.getLogger(__name__)
+# All the info we need while iterating the DAG while backfilling
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class BackfillQueueNavigationItem:
+ depth: int
+ stream_ordering: int
+ event_id: str
+ type: str
+
+
class _NoChainCoverIndex(Exception):
def __init__(self, room_id: str):
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
@@ -74,6 +84,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
):
super().__init__(database, db_conn, hs)
+ self.hs = hs
+
if hs.config.worker.run_background_tasks:
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
@@ -737,7 +749,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id,
)
- async def get_insertion_event_backwards_extremities_in_room(
+ async def get_insertion_event_backward_extremities_in_room(
self, room_id
) -> Dict[str, int]:
"""Get the insertion events we know about that we haven't backfilled yet.
@@ -754,7 +766,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
Map from event_id to depth
"""
- def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+ def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
@@ -770,8 +782,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return dict(txn)
return await self.db_pool.runInteraction(
- "get_insertion_event_backwards_extremities_in_room",
- get_insertion_event_backwards_extremities_in_room_txn,
+ "get_insertion_event_backward_extremities_in_room",
+ get_insertion_event_backward_extremities_in_room_txn,
room_id,
)
@@ -997,143 +1009,242 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)
- async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
- """Get a list of Events for a given topic that occurred before (and
- including) the events in event_list. Return a list of max size `limit`
+ def _get_connected_batch_event_backfill_results_txn(
+ self, txn: LoggingTransaction, insertion_event_id: str, limit: int
+ ) -> List[BackfillQueueNavigationItem]:
+ """
+ Find any batch connections of a given insertion event.
+ A batch event points at a insertion event via:
+ batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
Args:
- room_id
- event_list
- limit
+ txn: The database transaction to use
+ insertion_event_id: The event ID to navigate from. We will find
+ batch events that point back at this insertion event.
+ limit: Max number of event ID's to query for and return
+
+ Returns:
+ List of batch events that the backfill queue can process
+ """
+ batch_connection_query = """
+ SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
+ /* Find the batch that connects to the given insertion event */
+ INNER JOIN batch_events AS c
+ ON i.next_batch_id = c.batch_id
+ /* Get the depth of the batch start event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which matches the given event_id */
+ WHERE i.event_id = ?
+ LIMIT ?
"""
- event_ids = await self.db_pool.runInteraction(
- "get_backfill_events",
- self._get_backfill_events,
- room_id,
- event_list,
- limit,
- )
- events = await self.get_events_as_list(event_ids)
- return sorted(events, key=lambda e: -e.depth)
- def _get_backfill_events(self, txn, room_id, event_list, limit):
- logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+ # Find any batch connections for the given insertion event
+ txn.execute(
+ batch_connection_query,
+ (insertion_event_id, limit),
+ )
+ return [
+ BackfillQueueNavigationItem(
+ depth=row[0],
+ stream_ordering=row[1],
+ event_id=row[2],
+ type=row[3],
+ )
+ for row in txn
+ ]
- event_results = set()
+ def _get_connected_prev_event_backfill_results_txn(
+ self, txn: LoggingTransaction, event_id: str, limit: int
+ ) -> List[BackfillQueueNavigationItem]:
+ """
+ Find any events connected by prev_event the specified event_id.
- # We want to make sure that we do a breadth-first, "depth" ordered
- # search.
+ Args:
+ txn: The database transaction to use
+ event_id: The event ID to navigate from
+ limit: Max number of event ID's to query for and return
+ Returns:
+ List of prev events that the backfill queue can process
+ """
# Look for the prev_event_id connected to the given event_id
- query = """
- SELECT depth, prev_event_id FROM event_edges
- /* Get the depth of the prev_event_id from the events table */
+ connected_prev_event_query = """
+ SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
+ /* Get the depth and stream_ordering of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
- /* Find an event which matches the given event_id */
+ /* Look for an edge which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
+ /* Because we can have many events at the same depth,
+ * we want to also tie-break and sort on stream_ordering */
+ ORDER BY depth DESC, stream_ordering DESC
LIMIT ?
"""
- # Look for the "insertion" events connected to the given event_id
- connected_insertion_event_query = """
- SELECT e.depth, i.event_id FROM insertion_event_edges AS i
- /* Get the depth of the insertion event from the events table */
- INNER JOIN events AS e USING (event_id)
- /* Find an insertion event which points via prev_events to the given event_id */
- WHERE i.insertion_prev_event_id = ?
- LIMIT ?
+ txn.execute(
+ connected_prev_event_query,
+ (event_id, False, limit),
+ )
+ return [
+ BackfillQueueNavigationItem(
+ depth=row[0],
+ stream_ordering=row[1],
+ event_id=row[2],
+ type=row[3],
+ )
+ for row in txn
+ ]
+
+ async def get_backfill_events(
+ self, room_id: str, seed_event_id_list: list, limit: int
+ ):
+ """Get a list of Events for a given topic that occurred before (and
+ including) the events in seed_event_id_list. Return a list of max size `limit`
+
+ Args:
+ room_id
+ seed_event_id_list
+ limit
"""
+ event_ids = await self.db_pool.runInteraction(
+ "get_backfill_events",
+ self._get_backfill_events,
+ room_id,
+ seed_event_id_list,
+ limit,
+ )
+ events = await self.get_events_as_list(event_ids)
+ return sorted(
+ events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
+ )
- # Find any batch connections of a given insertion event
- batch_connection_query = """
- SELECT e.depth, c.event_id FROM insertion_events AS i
- /* Find the batch that connects to the given insertion event */
- INNER JOIN batch_events AS c
- ON i.next_batch_id = c.batch_id
- /* Get the depth of the batch start event from the events table */
- INNER JOIN events AS e USING (event_id)
- /* Find an insertion event which matches the given event_id */
- WHERE i.event_id = ?
- LIMIT ?
+ def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+ """
+ We want to make sure that we do a breadth-first, "depth" ordered search.
+ We also handle navigating historical branches of history connected by
+ insertion and batch events.
"""
+ logger.debug(
+ "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+ room_id,
+ seed_event_id_list,
+ limit,
+ )
+
+ event_id_results = set()
# In a PriorityQueue, the lowest valued entries are retrieved first.
- # We're using depth as the priority in the queue.
- # Depth is lowest at the oldest-in-time message and highest and
- # newest-in-time message. We add events to the queue with a negative depth so that
- # we process the newest-in-time messages first going backwards in time.
+ # We're using depth as the priority in the queue and tie-break based on
+ # stream_ordering. Depth is lowest at the oldest-in-time message and
+ # highest and newest-in-time message. We add events to the queue with a
+ # negative depth so that we process the newest-in-time messages first
+ # going backwards in time. stream_ordering follows the same pattern.
queue = PriorityQueue()
- for event_id in event_list:
- depth = self.db_pool.simple_select_one_onecol_txn(
+ for seed_event_id in seed_event_id_list:
+ event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
- keyvalues={"event_id": event_id, "room_id": room_id},
- retcol="depth",
+ keyvalues={"event_id": seed_event_id, "room_id": room_id},
+ retcols=(
+ "type",
+ "depth",
+ "stream_ordering",
+ ),
allow_none=True,
)
- if depth:
- queue.put((-depth, event_id))
+ if event_lookup_result is not None:
+ logger.debug(
+ "_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+ room_id,
+ seed_event_id,
+ event_lookup_result["depth"],
+ event_lookup_result["stream_ordering"],
+ event_lookup_result["type"],
+ )
- while not queue.empty() and len(event_results) < limit:
+ if event_lookup_result["depth"]:
+ queue.put(
+ (
+ -event_lookup_result["depth"],
+ -event_lookup_result["stream_ordering"],
+ seed_event_id,
+ event_lookup_result["type"],
+ )
+ )
+
+ while not queue.empty() and len(event_id_results) < limit:
try:
- _, event_id = queue.get_nowait()
+ _, _, event_id, event_type = queue.get_nowait()
except Empty:
break
- if event_id in event_results:
+ if event_id in event_id_results:
continue
- event_results.add(event_id)
+ event_id_results.add(event_id)
# Try and find any potential historical batches of message history.
- #
- # First we look for an insertion event connected to the current
- # event (by prev_event). If we find any, we need to go and try to
- # find any batch events connected to the insertion event (by
- # batch_id). If we find any, we'll add them to the queue and
- # navigate up the DAG like normal in the next iteration of the loop.
- txn.execute(
- connected_insertion_event_query, (event_id, limit - len(event_results))
- )
- connected_insertion_event_id_results = txn.fetchall()
- logger.debug(
- "_get_backfill_events: connected_insertion_event_query %s",
- connected_insertion_event_id_results,
- )
- for row in connected_insertion_event_id_results:
- connected_insertion_event_depth = row[0]
- connected_insertion_event = row[1]
- queue.put((-connected_insertion_event_depth, connected_insertion_event))
+ if self.hs.config.experimental.msc2716_enabled:
+ # We need to go and try to find any batch events connected
+ # to a given insertion event (by batch_id). If we find any, we'll
+ # add them to the queue and navigate up the DAG like normal in the
+ # next iteration of the loop.
+ if event_type == EventTypes.MSC2716_INSERTION:
+ # Find any batch connections for the given insertion event
+ connected_batch_event_backfill_results = (
+ self._get_connected_batch_event_backfill_results_txn(
+ txn, event_id, limit - len(event_id_results)
+ )
+ )
+ logger.debug(
+ "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
+ room_id,
+ connected_batch_event_backfill_results,
+ )
+ for (
+ connected_batch_event_backfill_item
+ ) in connected_batch_event_backfill_results:
+ if (
+ connected_batch_event_backfill_item.event_id
+ not in event_id_results
+ ):
+ queue.put(
+ (
+ -connected_batch_event_backfill_item.depth,
+ -connected_batch_event_backfill_item.stream_ordering,
+ connected_batch_event_backfill_item.event_id,
+ connected_batch_event_backfill_item.type,
+ )
+ )
- # Find any batch connections for the given insertion event
- txn.execute(
- batch_connection_query,
- (connected_insertion_event, limit - len(event_results)),
- )
- batch_start_event_id_results = txn.fetchall()
- logger.debug(
- "_get_backfill_events: batch_start_event_id_results %s",
- batch_start_event_id_results,
+ # Now we just look up the DAG by prev_events as normal
+ connected_prev_event_backfill_results = (
+ self._get_connected_prev_event_backfill_results_txn(
+ txn, event_id, limit - len(event_id_results)
)
- for row in batch_start_event_id_results:
- if row[1] not in event_results:
- queue.put((-row[0], row[1]))
-
- txn.execute(query, (event_id, False, limit - len(event_results)))
- prev_event_id_results = txn.fetchall()
+ )
logger.debug(
- "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+ "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
+ room_id,
+ connected_prev_event_backfill_results,
)
+ for (
+ connected_prev_event_backfill_item
+ ) in connected_prev_event_backfill_results:
+ if connected_prev_event_backfill_item.event_id not in event_id_results:
+ queue.put(
+ (
+ -connected_prev_event_backfill_item.depth,
+ -connected_prev_event_backfill_item.stream_ordering,
+ connected_prev_event_backfill_item.event_id,
+ connected_prev_event_backfill_item.type,
+ )
+ )
- for row in prev_event_id_results:
- if row[1] not in event_results:
- queue.put((-row[0], row[1]))
-
- return event_results
+ return event_id_results
async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b7554154ac..b804185c40 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2215,9 +2215,14 @@ class PersistEventsStore:
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
+ # 1. Don't add an event as a extremity again if we already persisted it
+ # as a non-outlier.
+ # 2. Don't add an outlier as an extremity if it has no prev_events
" AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
+ " SELECT 1 FROM events"
+ " LEFT JOIN event_edges edge"
+ " ON edge.event_id = events.event_id"
+ " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
" )"
)
@@ -2243,6 +2248,10 @@ class PersistEventsStore:
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
+ # If we encountered an event with no prev_events, then we might
+ # as well remove it now because it won't ever have anything else
+ # to backfill from.
+ or len(ev.prev_event_ids()) == 0
],
)
|