diff options
author | Erik Johnston <erik@matrix.org> | 2015-02-19 17:24:14 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-02-19 17:24:14 +0000 |
commit | 0ac2a79faa918280767c18e4db7ec29d7d3a3afb (patch) | |
tree | a2495f72dee1b8e08947067fe81eb7e7f91cc527 /synapse/federation/federation_server.py | |
parent | Update CHANGES.rst with missing changes (diff) | |
download | synapse-0ac2a79faa918280767c18e4db7ec29d7d3a3afb.tar.xz |
Initial stab at implementing a batched get_missing_pdus request
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b9663831..34bc397e8a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -305,6 +305,78 @@ class FederationServer(FederationBase): (200, send_content) ) + @defer.inlineCallbacks + def get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + limit = max(limit, 50) + min_depth = max(min_depth, 0) + + missing_events = yield self.store.get_missing_events( + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + limit=limit, + min_depth=min_depth, + ) + + known_ids = {e.event_id for e in missing_events} | {earliest_events} + + back_edges = { + e for e in missing_events + if {i for i, h in e.prev_events.items()} <= known_ids + } + + decoded_auth_events = set() + state = {} + auth_events = set() + auth_and_state = {} + for event in back_edges: + state_pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event.event_id, + do_auth=False, + ) + + state[event.event_id] = [s.event_id for s in state_pdus] + + auth_and_state.update({ + s.event_id: s for s in state_pdus + }) + + state_ids = {pdu.event_id for pdu in state_pdus} + prev_ids = {i for i, h in event.prev_events.items()} + partial_auth_chain = yield self.store.get_auth_chain( + state_ids | prev_ids, have_ids=decoded_auth_events.keys() + ) + + for p in partial_auth_chain: + p.signatures.update( + compute_event_signature( + p, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + auth_events.update( + a.event_id for a in partial_auth_chain + ) + + auth_and_state.update({ + a.event_id: a for a in partial_auth_chain + }) + + time_now = self._clock.time_msec() + + defer.returnValue({ + "events": [ev.get_pdu_json(time_now) for ev in missing_events], + "state_for_events": state, + "auth_events": auth_events, + "event_map": { + k: ev.get_pdu_json(time_now) + for k, ev in auth_and_state.items() + }, + }) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. |