diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 000a3081c2..1628a56294 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -181,7 +181,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def backfill(self, dest, context, limit):
+ def backfill(self, dest, context, limit, extremities):
"""Requests some more historic PDUs for the given context from the
given destination server.
@@ -189,12 +189,12 @@ class ReplicationLayer(object):
dest (str): The remote home server to ask.
context (str): The context to backfill.
limit (int): The maximum number of PDUs to return.
+ extremities (list): List of PDU id and origins of the first pdus
+ we have seen from the context
Returns:
Deferred: Results in the received PDUs.
"""
- extremities = yield self.store.get_oldest_pdus_in_context(context)
-
logger.debug("backfill extrem=%s", extremities)
# If there are no extremeties then we've (probably) reached the start.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1daeee833b..9f457ce292 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -181,7 +181,17 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit):
- pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+ extremities = yield self.store.get_oldest_events_in_room(room_id)
+
+ pdus = yield self.replication_layer.backfill(
+ dest,
+ room_id,
+ limit,
+ extremities=[
+ self.pdu_codec.decode_event_id(e)
+ for e in extremities
+ ]
+ )
events = []
@@ -390,6 +400,21 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue([])
+ @defer.inlineCallbacks
+ @log_function
+ def on_backfill_request(self, context, pdu_list, limit):
+
+ events = yield self.store.get_backfill_events(
+ context,
+ [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list],
+ limit
+ )
+
+ defer.returnValue([
+ self.pdu_codec.pdu_from_event(e)
+ for e in events
+ ])
+
@log_function
def _on_user_joined(self, user, room_id):
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 88d09d9ba8..438b42c1da 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -24,6 +24,23 @@ logger = logging.getLogger(__name__)
class EventFederationStore(SQLBaseStore):
+ def get_oldest_events_in_room(self, room_id):
+ return self.runInteraction(
+ "get_oldest_events_in_room",
+ self._get_oldest_events_in_room_txn,
+ room_id,
+ )
+
+ def _get_oldest_events_in_room_txn(self, txn, room_id):
+ return self._simple_select_onecol_txn(
+ txn,
+ table="event_backward_extremities",
+ keyvalues={
+ "room_id": room_id,
+ },
+ retcol="event_id",
+ )
+
def get_latest_events_in_room(self, room_id):
return self.runInteraction(
"get_latest_events_in_room",
@@ -159,4 +176,71 @@ class EventFederationStore(SQLBaseStore):
"AND not events.outlier "
")"
)
- txn.execute(query)
\ No newline at end of file
+ txn.execute(query)
+
+
+ def get_backfill_events(self, room_id, event_list, limit):
+ """Get a list of Events for a given topic that occured before (and
+ including) the pdus in pdu_list. Return a list of max size `limit`.
+
+ Args:
+ txn
+ room_id (str)
+ event_list (list)
+ limit (int)
+
+ Return:
+ list: A list of PduTuples
+ """
+ return self.runInteraction(
+ "get_backfill_events",
+ self._get_backfill_events, room_id, event_list, limit
+ )
+
+ def _get_backfill_events(self, txn, room_id, event_list, limit):
+ logger.debug(
+ "_get_backfill_events: %s, %s, %s",
+ room_id, repr(event_list), limit
+ )
+
+ # We seed the pdu_results with the things from the pdu_list.
+ event_results = event_list
+
+ front = event_list
+
+ query = (
+ "SELECT prev_event_id FROM event_edges "
+ "WHERE room_id = ? AND event_id = ? "
+ "LIMIT ?"
+ )
+
+ # We iterate through all event_ids in `front` to select their previous
+ # events. These are dumped in `new_front`.
+ # We continue until we reach the limit *or* new_front is empty (i.e.,
+ # we've run out of things to select
+ while front and len(event_results) < limit:
+
+ new_front = []
+ for event_id in front:
+ logger.debug(
+ "_backfill_interaction: id=%s",
+ event_id
+ )
+
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ logger.debug(
+ "_backfill_interaction: got id=%s",
+ *row
+ )
+ new_front.append(row)
+
+ front = new_front
+ event_results += new_front
+
+ # We also want to update the `prev_pdus` attributes before returning.
+ return self._get_pdu_tuples(txn, event_results)
|