diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/handler.py | 26 | ||||
-rw-r--r-- | synapse/federation/replication.py | 9 |
2 files changed, 23 insertions, 12 deletions
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 580e591aca..984c1558e9 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -63,7 +63,7 @@ class FederationEventHandler(object): Deferred: Resolved when it has successfully been queued for processing. """ - yield self._fill_out_prev_events(event) + yield self.fill_out_prev_events(event) pdu = self.pdu_codec.pdu_from_event(event) @@ -74,10 +74,18 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def backfill(self, room_id, limit): - # TODO: Work out which destinations to ask for backfill - # self.replication_layer.backfill(dest, room_id, limit) - pass + def backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) @log_function def get_state_for_room(self, destination, room_id): @@ -87,7 +95,7 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu): + def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it throught the StateHandler. """ @@ -95,7 +103,7 @@ class FederationEventHandler(object): try: with (yield self.lock_manager.lock(pdu.context)): - if event.is_state: + if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( pdu ) @@ -104,7 +112,7 @@ class FederationEventHandler(object): else: is_new_state = False - yield self.event_handler.on_receive(event, is_new_state) + yield self.event_handler.on_receive(event, is_new_state, backfilled) except AuthError: # TODO: Implement something in federation that allows us to @@ -129,7 +137,7 @@ class FederationEventHandler(object): yield self.event_handler.on_receive(new_state_event) @defer.inlineCallbacks - def _fill_out_prev_events(self, event): + def fill_out_prev_events(self, event): if hasattr(event, "prev_events"): return diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c9f2e06b7b..8030d0963f 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -209,7 +209,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(pdu, backfilled=True) defer.returnValue(pdus) @@ -416,7 +416,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu): + def _handle_new_pdu(self, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -452,7 +452,10 @@ class ReplicationLayer(object): # Persist the Pdu, but don't mark it as processed yet. yield self.pdu_actions.persist_received(pdu) - ret = yield self.handler.on_receive_pdu(pdu) + if not backfilled: + ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + else: + ret = None yield self.pdu_actions.mark_as_processed(pdu) |