From dbe77ec79a450f4e7b69d6de247864aad8928ebf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 18 Dec 2014 17:47:00 +0000 Subject: Replace distributor deferred list, with a simple for loop until I understand why the former breaks and the latter doesn't --- synapse/util/distributor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/util/distributor.py') diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 701ccdb781..6e69296d65 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -115,10 +115,10 @@ class Signal(object): failure.value, failure.getTracebackObject())) if not self.suppress_failures: - raise failure + failure.raiseException() deferreds.append(d.addErrback(eb)) - - result = yield defer.DeferredList( - deferreds, fireOnOneErrback=not self.suppress_failures - ) - defer.returnValue(result) + results = [] + for deferred in deferreds: + result = yield deferred + results.append(results) + defer.returnValue(results) -- cgit 1.4.1 From 041ac476a53f7adaa436309ccbb85f269bbb47dd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 18 Dec 2014 18:47:13 +0000 Subject: Supply auth_chain along with current state in '/state/', fetch auth events from a remote server if we are missing some of them --- synapse/federation/replication.py | 27 +++++++++++++----- synapse/handlers/federation.py | 55 +++++++++++++++++++++---------------- synapse/util/distributor.py | 2 +- tests/federation/test_federation.py | 2 ++ 4 files changed, 55 insertions(+), 31 deletions(-) (limited to 'synapse/util/distributor.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index ec9b6e246b..8abf67b1b5 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -256,31 +256,35 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, event_id=None): + def get_state_for_context(self, destination, context, event_id): """Requests all of the `current` state PDUs for a given context from a remote home server. Args: destination (str): The remote homeserver to query for the state. context (str): The context we're interested in. + event_id (str): The id of the event we want the state at. Returns: Deferred: Results in a list of PDUs. """ - transaction_data = yield self.transport_layer.get_context_state( + result = yield self.transport_layer.get_context_state( destination, context, event_id=event_id, ) - transaction = Transaction(**transaction_data) pdus = [ + self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] + ] + + auth_chain = [ self.event_from_pdu_json(p, outlier=True) - for p in transaction.pdus + for p in result.get("auth_chain", []) ] - defer.returnValue(pdus) + defer.returnValue((pdus, auth_chain)) @defer.inlineCallbacks @log_function @@ -383,10 +387,16 @@ class ReplicationLayer(object): context, event_id, ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) else: raise NotImplementedError("Specify an event") - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + defer.returnValue((200, { + "pdus": [pdu.get_pdu_json() for pdu in pdus], + "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], + })) @defer.inlineCallbacks @log_function @@ -573,6 +583,8 @@ class ReplicationLayer(object): state = None + auth_chain = [] + # We need to make sure we have all the auth events. # for e_id, _ in pdu.auth_events: # exists = yield self._get_persisted_pdu( @@ -645,7 +657,7 @@ class ReplicationLayer(object): "_handle_new_pdu getting state for %s", pdu.room_id ) - state = yield self.get_state_for_context( + state, auth_chain = yield self.get_state_for_context( origin, pdu.room_id, pdu.event_id, ) @@ -655,6 +667,7 @@ class ReplicationLayer(object): pdu, backfilled=backfilled, state=state, + auth_chain=auth_chain, ) else: ret = None diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2f6036145c..e23c5c2195 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -95,7 +95,8 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, backfilled, state=None): + def on_receive_pdu(self, origin, pdu, backfilled, state=None, + auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -150,35 +151,35 @@ class FederationHandler(BaseHandler): if not is_in_room and not event.internal_metadata.outlier: logger.debug("Got event for room we're not in.") - replication_layer = self.replication_layer - auth_chain = yield replication_layer.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) + replication = self.replication_layer + + if not state: + state, auth_chain = yield replication.get_state_for_context( + origin, context=event.room_id, event_id=event.event_id, + ) + + if not auth_chain: + auth_chain = yield replication.get_event_auth( + origin, + context=event.room_id, + event_id=event.event_id, + ) for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_missing=False) + yield self._handle_new_event(e, fetch_auth_from=origin) except: logger.exception( "Failed to handle auth event %s", e.event_id, ) - if not state: - state = yield replication_layer.get_state_for_context( - origin, - context=event.room_id, - event_id=event.event_id, - ) - # FIXME: Get auth chain for these state events - current_state = state if state: for e in state: + logging.info("A :) %r", e) e.internal_metadata.outlier = True try: yield self._handle_new_event(e) @@ -392,7 +393,7 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_missing=False) + yield self._handle_new_event(e) except: logger.exception( "Failed to handle auth event %s", @@ -404,8 +405,7 @@ class FederationHandler(BaseHandler): e.internal_metadata.outlier = True try: yield self._handle_new_event( - e, - fetch_missing=True + e, fetch_auth_from=target_host ) except: logger.exception( @@ -682,7 +682,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_missing=True): + current_state=None, fetch_auth_from=None): logger.debug( "_handle_new_event: Before annotate: %s, sigs: %s", @@ -703,11 +703,20 @@ class FederationHandler(BaseHandler): known_ids = set( [s.event_id for s in context.auth_events.values()] ) + for e_id, _ in event.auth_events: if e_id not in known_ids: - e = yield self.store.get_event( - e_id, allow_none=True, - ) + e = yield self.store.get_event(e_id, allow_none=True) + + if not e and fetch_auth_from is not None: + # Grab the auth_chain over federation if we are missing + # auth events. + auth_chain = yield self.replication_layer.get_event_auth( + fetch_auth_from, event.event_id, event.room_id + ) + for auth_event in auth_chain: + yield self._handle_new_event(auth_event) + e = yield self.store.get_event(e_id, allow_none=True) if not e: # TODO: Do some conflict res to make sure that we're diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 6e69296d65..6925ac96b6 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -120,5 +120,5 @@ class Signal(object): results = [] for deferred in deferreds: result = yield deferred - results.append(results) + results.append(result) defer.returnValue(results) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 79ac1ce10d..3e484cd303 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -52,6 +52,7 @@ class FederationTestCase(unittest.TestCase): "get_received_txn_response", "set_received_txn_response", "get_destination_retry_timings", + "get_auth_chain", ]) self.mock_persistence.get_received_txn_response.return_value = ( defer.succeed(None) @@ -59,6 +60,7 @@ class FederationTestCase(unittest.TestCase): self.mock_persistence.get_destination_retry_timings.return_value = ( defer.succeed(DestinationsTable.EntryType("", 0, 0)) ) + self.mock_persistence.get_auth_chain.return_value = [] self.mock_config = Mock() self.mock_config.signing_key = [MockKey()] self.clock = MockClock() -- cgit 1.4.1 From adb04b1e572d13b75541f4684aac3683e94d70b8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Jan 2015 13:21:39 +0000 Subject: Update copyright notices --- scripts/copyrighter.pl | 2 +- synapse/__init__.py | 2 +- synapse/api/__init__.py | 2 +- synapse/api/auth.py | 2 +- synapse/api/constants.py | 2 +- synapse/api/errors.py | 2 +- synapse/api/ratelimiting.py | 2 +- synapse/api/urls.py | 2 +- synapse/app/__init__.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/synctl.py | 2 +- synapse/config/__init__.py | 2 +- synapse/config/_base.py | 2 +- synapse/config/captcha.py | 2 +- synapse/config/database.py | 2 +- synapse/config/email.py | 2 +- synapse/config/homeserver.py | 2 +- synapse/config/logger.py | 2 +- synapse/config/ratelimiting.py | 2 +- synapse/config/repository.py | 2 +- synapse/config/server.py | 2 +- synapse/config/tls.py | 2 +- synapse/config/voip.py | 2 +- synapse/crypto/__init__.py | 2 +- synapse/crypto/context_factory.py | 2 +- synapse/crypto/event_signing.py | 2 +- synapse/crypto/keyclient.py | 2 +- synapse/crypto/keyring.py | 2 +- synapse/events/__init__.py | 2 +- synapse/events/builder.py | 2 +- synapse/events/snapshot.py | 2 +- synapse/events/utils.py | 2 +- synapse/events/validator.py | 2 +- synapse/federation/__init__.py | 2 +- synapse/federation/persistence.py | 2 +- synapse/federation/replication.py | 2 +- synapse/federation/transport.py | 2 +- synapse/federation/units.py | 2 +- synapse/handlers/__init__.py | 2 +- synapse/handlers/_base.py | 2 +- synapse/handlers/admin.py | 2 +- synapse/handlers/directory.py | 2 +- synapse/handlers/events.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/login.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/profile.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/typing.py | 2 +- synapse/http/__init__.py | 2 +- synapse/http/agent_name.py | 2 +- synapse/http/client.py | 2 +- synapse/http/endpoint.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/http/server.py | 4 ++-- synapse/http/server_key_resource.py | 2 +- synapse/media/v0/content_repository.py | 2 +- synapse/media/v1/__init__.py | 16 +++++++++++++++- synapse/media/v1/base_resource.py | 2 +- synapse/media/v1/download_resource.py | 2 +- synapse/media/v1/filepath.py | 2 +- synapse/media/v1/media_repository.py | 2 +- synapse/media/v1/thumbnail_resource.py | 2 +- synapse/media/v1/thumbnailer.py | 2 +- synapse/media/v1/upload_resource.py | 2 +- synapse/notifier.py | 2 +- synapse/rest/__init__.py | 2 +- synapse/rest/admin.py | 2 +- synapse/rest/base.py | 2 +- synapse/rest/directory.py | 2 +- synapse/rest/events.py | 2 +- synapse/rest/initial_sync.py | 2 +- synapse/rest/login.py | 2 +- synapse/rest/presence.py | 2 +- synapse/rest/profile.py | 2 +- synapse/rest/register.py | 2 +- synapse/rest/room.py | 2 +- synapse/rest/transactions.py | 2 +- synapse/rest/voip.py | 2 +- synapse/server.py | 2 +- synapse/state.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/directory.py | 2 +- synapse/storage/event_federation.py | 2 +- synapse/storage/feedback.py | 2 +- synapse/storage/keys.py | 2 +- synapse/storage/media_repository.py | 2 +- synapse/storage/presence.py | 2 +- synapse/storage/profile.py | 2 +- synapse/storage/registration.py | 2 +- synapse/storage/room.py | 2 +- synapse/storage/roommember.py | 2 +- synapse/storage/schema/delta/v2.sql | 2 +- synapse/storage/schema/delta/v3.sql | 2 +- synapse/storage/schema/delta/v4.sql | 14 ++++++++++++++ synapse/storage/schema/delta/v5.sql | 14 ++++++++++++++ synapse/storage/schema/delta/v6.sql | 2 +- synapse/storage/schema/delta/v8.sql | 2 +- synapse/storage/schema/delta/v9.sql | 2 +- synapse/storage/schema/event_edges.sql | 14 ++++++++++++++ synapse/storage/schema/event_signatures.sql | 2 +- synapse/storage/schema/im.sql | 2 +- synapse/storage/schema/keys.sql | 2 +- synapse/storage/schema/media_repository.sql | 2 +- synapse/storage/schema/presence.sql | 2 +- synapse/storage/schema/profiles.sql | 2 +- synapse/storage/schema/redactions.sql | 14 ++++++++++++++ synapse/storage/schema/room_aliases.sql | 2 +- synapse/storage/schema/state.sql | 2 +- synapse/storage/schema/transactions.sql | 2 +- synapse/storage/schema/users.sql | 2 +- synapse/storage/signatures.py | 2 +- synapse/storage/state.py | 2 +- synapse/storage/stream.py | 2 +- synapse/storage/transactions.py | 2 +- synapse/streams/__init__.py | 2 +- synapse/streams/config.py | 2 +- synapse/streams/events.py | 2 +- synapse/types.py | 2 +- synapse/util/__init__.py | 2 +- synapse/util/async.py | 2 +- synapse/util/distributor.py | 2 +- synapse/util/emailutils.py | 2 +- synapse/util/frozenutils.py | 2 +- synapse/util/jsonobject.py | 2 +- synapse/util/lockutils.py | 2 +- synapse/util/logcontext.py | 14 ++++++++++++++ synapse/util/logutils.py | 2 +- synapse/util/stringutils.py | 2 +- 132 files changed, 212 insertions(+), 128 deletions(-) (limited to 'synapse/util/distributor.py') diff --git a/scripts/copyrighter.pl b/scripts/copyrighter.pl index 7c03ef21fc..a913d74c8d 100755 --- a/scripts/copyrighter.pl +++ b/scripts/copyrighter.pl @@ -14,7 +14,7 @@ # limitations under the License. $copyright = <