diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 580e591aca..984c1558e9 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -63,7 +63,7 @@ class FederationEventHandler(object):
Deferred: Resolved when it has successfully been queued for
processing.
"""
- yield self._fill_out_prev_events(event)
+ yield self.fill_out_prev_events(event)
pdu = self.pdu_codec.pdu_from_event(event)
@@ -74,10 +74,18 @@ class FederationEventHandler(object):
@log_function
@defer.inlineCallbacks
- def backfill(self, room_id, limit):
- # TODO: Work out which destinations to ask for backfill
- # self.replication_layer.backfill(dest, room_id, limit)
- pass
+ def backfill(self, dest, room_id, limit):
+ pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+
+ if not pdus:
+ defer.returnValue([])
+
+ events = [
+ self.pdu_codec.event_from_pdu(pdu)
+ for pdu in pdus
+ ]
+
+ defer.returnValue(events)
@log_function
def get_state_for_room(self, destination, room_id):
@@ -87,7 +95,7 @@ class FederationEventHandler(object):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu):
+ def on_receive_pdu(self, pdu, backfilled):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it throught the StateHandler.
"""
@@ -95,7 +103,7 @@ class FederationEventHandler(object):
try:
with (yield self.lock_manager.lock(pdu.context)):
- if event.is_state:
+ if event.is_state and not backfilled:
is_new_state = yield self.state_handler.handle_new_state(
pdu
)
@@ -104,7 +112,7 @@ class FederationEventHandler(object):
else:
is_new_state = False
- yield self.event_handler.on_receive(event, is_new_state)
+ yield self.event_handler.on_receive(event, is_new_state, backfilled)
except AuthError:
# TODO: Implement something in federation that allows us to
@@ -129,7 +137,7 @@ class FederationEventHandler(object):
yield self.event_handler.on_receive(new_state_event)
@defer.inlineCallbacks
- def _fill_out_prev_events(self, event):
+ def fill_out_prev_events(self, event):
if hasattr(event, "prev_events"):
return
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index c9f2e06b7b..8030d0963f 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -209,7 +209,7 @@ class ReplicationLayer(object):
pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
for pdu in pdus:
- yield self._handle_new_pdu(pdu)
+ yield self._handle_new_pdu(pdu, backfilled=True)
defer.returnValue(pdus)
@@ -416,7 +416,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def _handle_new_pdu(self, pdu):
+ def _handle_new_pdu(self, pdu, backfilled=False):
# We reprocess pdus when we have seen them only as outliers
existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
@@ -452,7 +452,10 @@ class ReplicationLayer(object):
# Persist the Pdu, but don't mark it as processed yet.
yield self.pdu_actions.persist_received(pdu)
- ret = yield self.handler.on_receive_pdu(pdu)
+ if not backfilled:
+ ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
+ else:
+ ret = None
yield self.pdu_actions.mark_as_processed(pdu)
|