diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,6 +23,7 @@ from ._base import BaseHandler
from synapse.api.errors import (
AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ FederationDeniedError,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
@@ -66,7 +68,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_replication_layer()
+ self.replication_layer = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -74,8 +76,7 @@ class FederationHandler(BaseHandler):
self.is_mine_id = hs.is_mine_id
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
-
- self.replication_layer.set_handler(self)
+ self.event_creation_handler = hs.get_event_creation_handler()
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
except NotRetryingDestination as e:
logger.info(e.message)
continue
+ except FederationDeniedError as e:
+ logger.info(e)
+ continue
except Exception as e:
logger.exception(
"Failed to backfill from %s because %s",
@@ -804,13 +808,12 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
+ resolve = logcontext.preserve_fn(
+ self.state_handler.resolve_state_groups_for_events
+ )
states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
- room_id, [e]
- )
- for e in event_ids
- ], consumeErrors=True,
+ [resolve(room_id, [e]) for e in event_ids],
+ consumeErrors=True,
))
states = dict(zip(event_ids, [s.state for s in states]))
@@ -1004,8 +1007,7 @@ class FederationHandler(BaseHandler):
})
try:
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
except AuthError as e:
@@ -1245,8 +1247,7 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -1444,16 +1445,24 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not event.internal_metadata.is_outlier() and not backfilled:
- yield self.action_generator.handle_push_actions_for_event(
- event, context
- )
+ try:
+ if not event.internal_metadata.is_outlier() and not backfilled:
+ yield self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- backfilled=backfilled,
- )
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event,
+ context=context,
+ backfilled=backfilled,
+ )
+ except: # noqa: E722, as we reraise the exception this is fine.
+ # Ensure that we actually remove the entries in the push actions
+ # staging area
+ logcontext.preserve_fn(
+ self.store.remove_push_actions_from_staging
+ )(event.event_id)
+ raise
if not backfilled:
# this intentionally does not yield: we don't care about the result
@@ -1828,8 +1837,8 @@ class FederationHandler(BaseHandler):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state
- self._update_context_for_auth_events(
- context, auth_events, event_key,
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
)
if different_auth and not event.internal_metadata.is_outlier():
@@ -1910,8 +1919,8 @@ class FederationHandler(BaseHandler):
# 4. Look at rejects and their proofs.
# TODO.
- self._update_context_for_auth_events(
- context, auth_events, event_key,
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
)
try:
@@ -1920,11 +1929,15 @@ class FederationHandler(BaseHandler):
logger.warn("Failed auth resolution for %r because %s", event, e)
raise e
- def _update_context_for_auth_events(self, context, auth_events,
+ @defer.inlineCallbacks
+ def _update_context_for_auth_events(self, event, context, auth_events,
event_key):
- """Update the state_ids in an event context after auth event resolution
+ """Update the state_ids in an event context after auth event resolution,
+ storing the changes as a new state group.
Args:
+ event (Event): The event we're handling the context for
+
context (synapse.events.snapshot.EventContext): event context
to be updated
@@ -1947,7 +1960,13 @@ class FederationHandler(BaseHandler):
context.prev_state_ids.update({
k: a.event_id for k, a in auth_events.iteritems()
})
- context.state_group = self.store.get_next_state_group()
+ context.state_group = yield self.store.store_state_group(
+ event.event_id,
+ event.room_id,
+ prev_group=context.prev_group,
+ delta_ids=context.delta_ids,
+ current_state_ids=context.current_state_ids,
+ )
@defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
@@ -2117,8 +2136,7 @@ class FederationHandler(BaseHandler):
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
@@ -2133,7 +2151,7 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2156,8 +2174,7 @@ class FederationHandler(BaseHandler):
"""
builder = self.event_builder_factory.new(event_dict)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -2178,7 +2195,7 @@ class FederationHandler(BaseHandler):
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
@@ -2207,8 +2224,9 @@ class FederationHandler(BaseHandler):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(builder=builder)
+ event, context = yield self.event_creation_handler.create_new_client_event(
+ builder=builder,
+ )
defer.returnValue((event, context))
@defer.inlineCallbacks
|