summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-10-31 09:59:02 +0000
committerErik Johnston <erik@matrix.org>2014-10-31 09:59:02 +0000
commitf2de2d644af80557baebf43f64f3968b8ab46d0b (patch)
treea48a9d95a86ca36da5025d9b36c464cf412e91d1 /synapse
parentMerge branch 'develop' of github.com:matrix-org/synapse into federation_autho... (diff)
downloadsynapse-f2de2d644af80557baebf43f64f3968b8ab46d0b.tar.xz
Move the impl of backfill to use events.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/replication.py6
-rw-r--r--synapse/handlers/federation.py27
-rw-r--r--synapse/storage/event_federation.py86
3 files changed, 114 insertions, 5 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 000a3081c2..1628a56294 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -181,7 +181,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def backfill(self, dest, context, limit):
+    def backfill(self, dest, context, limit, extremities):
         """Requests some more historic PDUs for the given context from the
         given destination server.
 
@@ -189,12 +189,12 @@ class ReplicationLayer(object):
             dest (str): The remote home server to ask.
             context (str): The context to backfill.
             limit (int): The maximum number of PDUs to return.
+            extremities (list): List of PDU id and origins of the first pdus
+                we have seen from the context
 
         Returns:
             Deferred: Results in the received PDUs.
         """
-        extremities = yield self.store.get_oldest_pdus_in_context(context)
-
         logger.debug("backfill extrem=%s", extremities)
 
         # If there are no extremeties then we've (probably) reached the start.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1daeee833b..9f457ce292 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -181,7 +181,17 @@ class FederationHandler(BaseHandler):
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit):
-        pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+        extremities = yield self.store.get_oldest_events_in_room(room_id)
+
+        pdus = yield self.replication_layer.backfill(
+            dest,
+            room_id,
+            limit,
+            extremities=[
+                self.pdu_codec.decode_event_id(e)
+                for e in extremities
+            ]
+        )
 
         events = []
 
@@ -390,6 +400,21 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue([])
 
+    @defer.inlineCallbacks
+    @log_function
+    def on_backfill_request(self, context, pdu_list, limit):
+
+        events = yield self.store.get_backfill_events(
+            context,
+            [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list],
+            limit
+        )
+
+        defer.returnValue([
+            self.pdu_codec.pdu_from_event(e)
+            for e in events
+        ])
+
     @log_function
     def _on_user_joined(self, user, room_id):
         waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 88d09d9ba8..438b42c1da 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -24,6 +24,23 @@ logger = logging.getLogger(__name__)
 
 class EventFederationStore(SQLBaseStore):
 
+    def get_oldest_events_in_room(self, room_id):
+        return self.runInteraction(
+            "get_oldest_events_in_room",
+            self._get_oldest_events_in_room_txn,
+            room_id,
+        )
+
+    def _get_oldest_events_in_room_txn(self, txn, room_id):
+        return self._simple_select_onecol_txn(
+            txn,
+            table="event_backward_extremities",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="event_id",
+        )
+
     def get_latest_events_in_room(self, room_id):
         return self.runInteraction(
             "get_latest_events_in_room",
@@ -159,4 +176,71 @@ class EventFederationStore(SQLBaseStore):
                 "AND not events.outlier "
                 ")"
             )
-            txn.execute(query)
\ No newline at end of file
+            txn.execute(query)
+
+
+    def get_backfill_events(self, room_id, event_list, limit):
+        """Get a list of Events for a given topic that occured before (and
+        including) the pdus in pdu_list. Return a list of max size `limit`.
+
+        Args:
+            txn
+            room_id (str)
+            event_list (list)
+            limit (int)
+
+        Return:
+            list: A list of PduTuples
+        """
+        return self.runInteraction(
+            "get_backfill_events",
+            self._get_backfill_events, room_id, event_list, limit
+        )
+
+    def _get_backfill_events(self, txn, room_id, event_list, limit):
+        logger.debug(
+            "_get_backfill_events: %s, %s, %s",
+            room_id, repr(event_list), limit
+        )
+
+        # We seed the pdu_results with the things from the pdu_list.
+        event_results = event_list
+
+        front = event_list
+
+        query = (
+            "SELECT prev_event_id FROM event_edges "
+            "WHERE room_id = ? AND event_id = ? "
+            "LIMIT ?"
+        )
+
+        # We iterate through all event_ids in `front` to select their previous
+        # events. These are dumped in `new_front`.
+        # We continue until we reach the limit *or* new_front is empty (i.e.,
+        # we've run out of things to select
+        while front and len(event_results) < limit:
+
+            new_front = []
+            for event_id in front:
+                logger.debug(
+                    "_backfill_interaction: id=%s",
+                    event_id
+                )
+
+                txn.execute(
+                    query,
+                    (room_id, event_id, limit - len(event_results))
+                )
+
+                for row in txn.fetchall():
+                    logger.debug(
+                        "_backfill_interaction: got id=%s",
+                        *row
+                    )
+                    new_front.append(row)
+
+            front = new_front
+            event_results += new_front
+
+        # We also want to update the `prev_pdus` attributes before returning.
+        return self._get_pdu_tuples(txn, event_results)