summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/handler.py26
-rw-r--r--synapse/federation/replication.py9
2 files changed, 23 insertions, 12 deletions
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 580e591aca..984c1558e9 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -63,7 +63,7 @@ class FederationEventHandler(object):
             Deferred: Resolved when it has successfully been queued for
             processing.
         """
-        yield self._fill_out_prev_events(event)
+        yield self.fill_out_prev_events(event)
 
         pdu = self.pdu_codec.pdu_from_event(event)
 
@@ -74,10 +74,18 @@ class FederationEventHandler(object):
 
     @log_function
     @defer.inlineCallbacks
-    def backfill(self, room_id, limit):
-        # TODO: Work out which destinations to ask for backfill
-        # self.replication_layer.backfill(dest, room_id, limit)
-        pass
+    def backfill(self, dest, room_id, limit):
+        pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+
+        if not pdus:
+            defer.returnValue([])
+
+        events = [
+            self.pdu_codec.event_from_pdu(pdu)
+            for pdu in pdus
+        ]
+
+        defer.returnValue(events)
 
     @log_function
     def get_state_for_room(self, destination, room_id):
@@ -87,7 +95,7 @@ class FederationEventHandler(object):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, pdu):
+    def on_receive_pdu(self, pdu, backfilled):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it throught the StateHandler.
         """
@@ -95,7 +103,7 @@ class FederationEventHandler(object):
 
         try:
             with (yield self.lock_manager.lock(pdu.context)):
-                if event.is_state:
+                if event.is_state and not backfilled:
                     is_new_state = yield self.state_handler.handle_new_state(
                         pdu
                     )
@@ -104,7 +112,7 @@ class FederationEventHandler(object):
                 else:
                     is_new_state = False
 
-            yield self.event_handler.on_receive(event, is_new_state)
+            yield self.event_handler.on_receive(event, is_new_state, backfilled)
 
         except AuthError:
             # TODO: Implement something in federation that allows us to
@@ -129,7 +137,7 @@ class FederationEventHandler(object):
         yield self.event_handler.on_receive(new_state_event)
 
     @defer.inlineCallbacks
-    def _fill_out_prev_events(self, event):
+    def fill_out_prev_events(self, event):
         if hasattr(event, "prev_events"):
             return
 
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index c9f2e06b7b..8030d0963f 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -209,7 +209,7 @@ class ReplicationLayer(object):
 
         pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
         for pdu in pdus:
-            yield self._handle_new_pdu(pdu)
+            yield self._handle_new_pdu(pdu, backfilled=True)
 
         defer.returnValue(pdus)
 
@@ -416,7 +416,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_pdu(self, pdu):
+    def _handle_new_pdu(self, pdu, backfilled=False):
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
 
@@ -452,7 +452,10 @@ class ReplicationLayer(object):
         # Persist the Pdu, but don't mark it as processed yet.
         yield self.pdu_actions.persist_received(pdu)
 
-        ret = yield self.handler.on_receive_pdu(pdu)
+        if not backfilled:
+            ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
+        else:
+            ret = None
 
         yield self.pdu_actions.mark_as_processed(pdu)