diff options
author | David Baker <dave@matrix.org> | 2015-01-13 13:15:51 +0000 |
---|---|---|
committer | David Baker <dave@matrix.org> | 2015-01-13 13:15:51 +0000 |
commit | c06a9063e1d838f776edfd79cfc8ab29c748d794 (patch) | |
tree | bcfa472e65d4dacbab666d5787eff9293e5ccc41 /synapse/federation/replication.py | |
parent | Split out function to decide whether to notify or a given event (diff) | |
parent | Merge branch 'hotfixes-v0.6.1b' of github.com:matrix-org/synapse into develop (diff) | |
download | synapse-c06a9063e1d838f776edfd79cfc8ab29c748d794.tar.xz |
Merge branch 'develop' into pushers
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r-- | synapse/federation/replication.py | 41 |
1 files changed, 30 insertions, 11 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9f8aadccca..a4c29b484b 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2014, 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -256,31 +256,35 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, event_id=None): + def get_state_for_context(self, destination, context, event_id): """Requests all of the `current` state PDUs for a given context from a remote home server. Args: destination (str): The remote homeserver to query for the state. context (str): The context we're interested in. + event_id (str): The id of the event we want the state at. Returns: Deferred: Results in a list of PDUs. """ - transaction_data = yield self.transport_layer.get_context_state( + result = yield self.transport_layer.get_context_state( destination, context, event_id=event_id, ) - transaction = Transaction(**transaction_data) pdus = [ + self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] + ] + + auth_chain = [ self.event_from_pdu_json(p, outlier=True) - for p in transaction.pdus + for p in result.get("auth_chain", []) ] - defer.returnValue(pdus) + defer.returnValue((pdus, auth_chain)) @defer.inlineCallbacks @log_function @@ -383,10 +387,16 @@ class ReplicationLayer(object): context, event_id, ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) else: raise NotImplementedError("Specify an event") - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + defer.returnValue((200, { + "pdus": [pdu.get_pdu_json() for pdu in pdus], + "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], + })) @defer.inlineCallbacks @log_function @@ -562,8 +572,8 @@ class ReplicationLayer(object): already_seen = ( existing and ( - not existing.internal_metadata.outlier - or pdu.internal_metadata.outlier + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() ) ) if already_seen: @@ -573,6 +583,8 @@ class ReplicationLayer(object): state = None + auth_chain = [] + # We need to make sure we have all the auth events. # for e_id, _ in pdu.auth_events: # exists = yield self._get_persisted_pdu( @@ -604,7 +616,7 @@ class ReplicationLayer(object): # ) # Get missing pdus if necessary. - if not pdu.internal_metadata.outlier: + if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( pdu.room_id @@ -645,7 +657,7 @@ class ReplicationLayer(object): "_handle_new_pdu getting state for %s", pdu.room_id ) - state = yield self.get_state_for_context( + state, auth_chain = yield self.get_state_for_context( origin, pdu.room_id, pdu.event_id, ) @@ -655,6 +667,7 @@ class ReplicationLayer(object): pdu, backfilled=backfilled, state=state, + auth_chain=auth_chain, ) else: ret = None @@ -717,6 +730,7 @@ class _TransactionQueue(object): destinations = set(destinations) destinations.discard(self.server_name) + destinations.discard("localhost") logger.debug("Sending to: %s", str(destinations)) @@ -801,6 +815,8 @@ class _TransactionQueue(object): else: logger.info("TX [%s] is ready for retry", destination) + logger.info("TX [%s] _attempt_new_transaction", destination) + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -813,6 +829,9 @@ class _TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + if pending_pdus: + logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) + if not pending_pdus and not pending_edus and not pending_failures: return |