diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..0ebf0fd188 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,37 +20,51 @@ import itertools
import logging
import sys
+import six
+from six import iteritems, itervalues
+from six.moves import http_client, zip
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
-import six
-from six.moves import http_client
-from six import iteritems
-from twisted.internet import defer
from unpaddedbase64 import decode_base64
-from ._base import BaseHandler
+from twisted.internet import defer
+from synapse.api.constants import (
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ Membership,
+ RejectedReason,
+)
from synapse.api.errors import (
- AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ AuthError,
+ CodeMessageException,
FederationDeniedError,
+ FederationError,
+ StoreError,
+ SynapseError,
)
-from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
- compute_event_signature, add_hashes_and_signatures,
+ add_hashes_and_signatures,
+ compute_event_signature,
)
+from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+ ReplicationCleanRoomRestServlet,
+ ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
+from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async_helpers import Linearizer
+from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
-from synapse.util.distributor import user_joined_room
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -72,7 +86,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_federation_client()
+ self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -82,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+
+ self._send_events_to_master = (
+ ReplicationFederationSendEventsRestServlet.make_client(hs)
+ )
+ self._notify_user_membership_change = (
+ ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+ )
+ self._clean_room_for_join_client = (
+ ReplicationCleanRoomRestServlet.make_client(hs)
+ )
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -89,7 +115,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_receive_pdu(self, origin, pdu, get_missing=True):
+ def on_receive_pdu(
+ self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ ):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -103,8 +131,10 @@ class FederationHandler(BaseHandler):
"""
# We reprocess pdus when we have seen them only as outliers
- existing = yield self.get_persisted_pdu(
- origin, pdu.event_id, do_auth=False
+ existing = yield self.store.get_event(
+ pdu.event_id,
+ allow_none=True,
+ allow_rejected=True,
)
# FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +191,11 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
)
- return
+ defer.returnValue(None)
state = None
-
auth_chain = []
- fetch_state = False
-
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
@@ -223,26 +250,61 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ if sent_to_us_directly and prevs - seen:
+ # If they have sent it to us directly, and the server
+ # isn't telling us about the auth events that it's
+ # made a message referencing, we explode
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- fetch_state = True
+ elif prevs - seen:
+ # Calculate the state of the previous events, and
+ # de-conflict them to find the current state.
+ state_groups = []
+ auth_chains = set()
+ try:
+ # Get the state of the events we know about
+ ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ state_groups.append(ours)
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in prevs - seen:
+ state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, pdu.room_id, p
+ )
+ )
+ auth_chains.update(got_auth_chain)
+ state_group = {(x.type, x.state_key): x.event_id for x in state}
+ state_groups.append(state_group)
+
+ # Resolve any conflicting state
+ def fetch(ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False
+ )
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.replication_layer.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except Exception:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
+ room_version = yield self.store.get_room_version(pdu.room_id)
+ state_map = yield resolve_events_with_factory(
+ room_version, state_groups, {pdu.event_id: pdu}, fetch
+ )
+
+ state = (yield self.store.get_events(state_map.values())).values()
+ auth_chain = list(auth_chains)
+ except Exception:
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=pdu.event_id,
+ )
yield self._process_received_pdu(
origin,
@@ -299,7 +361,7 @@ class FederationHandler(BaseHandler):
#
# see https://github.com/matrix-org/synapse/pull/1744
- missing_events = yield self.replication_layer.get_missing_events(
+ missing_events = yield self.federation_client.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
@@ -320,11 +382,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn("Event %s failed history check.")
+ else:
+ raise
@log_function
@defer.inlineCallbacks
@@ -355,7 +423,7 @@ class FederationHandler(BaseHandler):
)
try:
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
except AuthError as e:
@@ -399,7 +467,7 @@ class FederationHandler(BaseHandler):
yield self._handle_new_events(origin, event_infos)
try:
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin,
event,
state=state,
@@ -424,24 +492,16 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -453,84 +513,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
-
- @measure_func("_filter_events_for_server")
- @defer.inlineCallbacks
- def _filter_events_for_server(self, server_name, room_id, events):
- event_to_state_ids = yield self.store.get_state_ids_for_events(
- frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, None),
- )
- )
-
- # We only want to pull out member events that correspond to the
- # server's domain.
-
- def check_match(id):
- try:
- return server_name == get_domain_from_id(id)
- except Exception:
- return False
-
- # Parses mapping `event_id -> (type, state_key) -> state event_id`
- # to get all state ids that we're interested in.
- event_map = yield self.store.get_events([
- e_id
- for key_to_eid in list(event_to_state_ids.values())
- for key, e_id in key_to_eid.items()
- if key[0] != EventTypes.Member or check_match(key[1])
- ])
-
- event_to_state = {
- e_id: {
- key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.iteritems()
- if inner_e_id in event_map
- }
- for e_id, key_to_eid in event_to_state_ids.iteritems()
- }
-
- def redact_disallowed(event, state):
- if not state:
- return event
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- if visibility in ["invited", "joined"]:
- # We now loop through all state events looking for
- # membership states for the requesting server to determine
- # if the server is either in the room or has been invited
- # into the room.
- for ev in state.itervalues():
- if ev.type != EventTypes.Member:
- continue
- try:
- domain = get_domain_from_id(ev.state_key)
- except Exception:
- continue
-
- if domain != server_name:
- continue
-
- memtype = ev.membership
- if memtype == Membership.JOIN:
- return event
- elif memtype == Membership.INVITE:
- if visibility == "invited":
- return event
- else:
- return prune_event(event)
-
- return event
-
- defer.returnValue([
- redact_disallowed(e, event_to_state[e.event_id])
- for e in events
- ])
+ yield self.user_joined_room(user, event.room_id)
@log_function
@defer.inlineCallbacks
@@ -551,7 +534,7 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
- events = yield self.replication_layer.backfill(
+ events = yield self.federation_client.backfill(
dest,
room_id,
limit=limit,
@@ -599,7 +582,7 @@ class FederationHandler(BaseHandler):
state_events = {}
events_to_state = {}
for e_id in edges:
- state, auth = yield self.replication_layer.get_state_for_room(
+ state, auth = yield self.federation_client.get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id
@@ -641,7 +624,7 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
- self.replication_layer.get_pdu,
+ self.federation_client.get_pdu,
[dest],
event_id,
outlier=True,
@@ -763,7 +746,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.iteritems()
+ for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -780,7 +763,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+ return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -843,7 +826,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.iterkeys())
+ event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -859,15 +842,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+ [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.iteritems()
+ for k, e_id in iteritems(state_dict)
if e_id in state_map
- } for key, state_dict in states.iteritems()
+ } for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@@ -922,7 +905,7 @@ class FederationHandler(BaseHandler):
Invites must be signed by the invitee's server before distribution.
"""
- pdu = yield self.replication_layer.send_invite(
+ pdu = yield self.federation_client.send_invite(
destination=target_host,
room_id=event.room_id,
event_id=event.event_id,
@@ -938,16 +921,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -972,6 +945,9 @@ class FederationHandler(BaseHandler):
joinee,
"join",
content,
+ params={
+ "ver": KNOWN_ROOM_VERSIONS,
+ },
)
# This shouldn't happen, because the RoomMemberHandler has a
@@ -981,7 +957,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = []
- yield self.store.clean_room_for_join(room_id)
+ yield self._clean_room_for_join(room_id)
handled_events = set()
@@ -994,7 +970,7 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
- ret = yield self.replication_layer.send_join(target_hosts, event)
+ ret = yield self.federation_client.send_join(target_hosts, event)
origin = ret["origin"]
state = ret["state"]
@@ -1020,15 +996,10 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
-
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -1123,7 +1094,7 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin, event
)
@@ -1133,25 +1104,17 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
+ yield self.user_joined_room(user, event.room_id)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
- state_ids = list(context.prev_state_ids.values())
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(context.prev_state_ids.values()))
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@@ -1213,17 +1176,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1248,35 +1201,26 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- yield self.replication_layer.send_leave(
+ yield self.federation_client.send_leave(
target_hosts,
event
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
- content={},):
- origin, pdu = yield self.replication_layer.make_membership_event(
+ content={}, params=None):
+ origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
membership,
content,
+ params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1316,7 +1260,7 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_leave_request(self, room_id, user_id):
""" We've received a /make_leave/ request, so we create a partial
- join event for the room and return that. We do *not* persist or
+ leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
builder = self.event_builder_factory.new({
@@ -1355,7 +1299,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ yield self._handle_new_event(
origin, event
)
@@ -1365,23 +1309,16 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
defer.returnValue(None)
@defer.inlineCallbacks
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
+
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id,
+ )
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
@@ -1393,8 +1330,7 @@ class FederationHandler(BaseHandler):
(e.type, e.state_key): e for e in state
}
- event = yield self.store.get_event(event_id)
- if event and event.is_state():
+ if event.is_state():
# Get previous state
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
@@ -1405,18 +1341,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1425,7 +1349,9 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id,
+ )
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
@@ -1435,8 +1361,7 @@ class FederationHandler(BaseHandler):
_, state = state_groups.items().pop()
results = state
- event = yield self.store.get_event(event_id)
- if event and event.is_state():
+ if event.is_state():
# Get previous state
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
@@ -1462,17 +1387,26 @@ class FederationHandler(BaseHandler):
limit
)
- events = yield self._filter_events_for_server(origin, room_id, events)
+ events = yield filter_events_for_server(self.store, origin, events)
defer.returnValue(events)
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id, do_auth=True):
- """ Get a PDU from the database with given origin and id.
+ def get_persisted_pdu(self, origin, event_id):
+ """Get an event from the database for the given server.
+
+ Args:
+ origin [str]: hostname of server which is requesting the event; we
+ will check that the server is allowed to see it.
+ event_id [str]: id of the event being requested
Returns:
- Deferred: Results in a `Pdu`.
+ Deferred[EventBase|None]: None if we know nothing about the event;
+ otherwise the (possibly-redacted) event.
+
+ Raises:
+ AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event_id,
@@ -1481,32 +1415,17 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
- if do_auth:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
- )
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
- events = yield self._filter_events_for_server(
- origin, event.room_id, [event]
- )
-
- event = events[0]
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+ events = yield filter_events_for_server(
+ self.store, origin, [event],
+ )
+ event = events[0]
defer.returnValue(event)
else:
defer.returnValue(None)
@@ -1531,9 +1450,8 @@ class FederationHandler(BaseHandler):
event, context
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
+ yield self.persist_events_and_notify(
+ [(event, context)],
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
@@ -1546,15 +1464,7 @@ class FederationHandler(BaseHandler):
six.reraise(tp, value, tb)
- if not backfilled:
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- logcontext.run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id,
- )
-
- defer.returnValue((context, event_stream_id, max_stream_id))
+ defer.returnValue(context)
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
@@ -1562,6 +1472,8 @@ class FederationHandler(BaseHandler):
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
+
+ Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@@ -1576,10 +1488,10 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self.store.persist_events(
+ yield self.persist_events_and_notify(
[
(ev_info["event"], context)
- for ev_info, context in itertools.izip(event_infos, contexts)
+ for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@@ -1588,7 +1500,8 @@ class FederationHandler(BaseHandler):
def _persist_auth_tree(self, origin, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
- Persists the event seperately.
+ Persists the event separately. Notifies about the persisted events
+ where appropriate.
Will attempt to fetch missing auth events.
@@ -1599,8 +1512,7 @@ class FederationHandler(BaseHandler):
event (Event)
Returns:
- 2-tuple of (event_stream_id, max_stream_id) from the persist_event
- call for `event`
+ Deferred
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
@@ -1626,7 +1538,7 @@ class FederationHandler(BaseHandler):
missing_auth_events.add(e_id)
for e_id in missing_auth_events:
- m_ev = yield self.replication_layer.get_pdu(
+ m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
outlier=True,
@@ -1664,7 +1576,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self.store.persist_events(
+ yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1675,12 +1587,10 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event, new_event_context,
+ yield self.persist_events_and_notify(
+ [(event, new_event_context)],
)
- defer.returnValue((event_stream_id, max_stream_id))
-
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
"""
@@ -1699,8 +1609,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1736,8 +1647,19 @@ class FederationHandler(BaseHandler):
defer.returnValue(context)
@defer.inlineCallbacks
- def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
+ def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
missing):
+ in_room = yield self.auth.check_host_in_room(
+ room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id
+ )
+
# Just go through and process each event in `remote_auth_chain`. We
# don't want to fall into the trap of `missing` being wrong.
for e in remote_auth_chain:
@@ -1747,7 +1669,6 @@ class FederationHandler(BaseHandler):
pass
# Now get the current auth_chain for the event.
- event = yield self.store.get_event(event_id)
local_auth_chain = yield self.store.get_auth_chain(
[auth_id for auth_id, _ in event.auth_events],
include_given=True
@@ -1760,15 +1681,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -1794,8 +1706,8 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
- missing_events = yield self._filter_events_for_server(
- origin, room_id, missing_events,
+ missing_events = yield filter_events_for_server(
+ self.store, origin, missing_events,
)
defer.returnValue(missing_events)
@@ -1844,7 +1756,7 @@ class FederationHandler(BaseHandler):
logger.info("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
try:
- remote_auth_chain = yield self.replication_layer.get_event_auth(
+ remote_auth_chain = yield self.federation_client.get_event_auth(
origin, event.room_id, event.event_id
)
@@ -1917,7 +1829,10 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
+ room_version = yield self.store.get_room_version(event.room_id)
+
new_state = self.state_handler.resolve_events(
+ room_version,
[list(local_view.values()), list(remote_view.values())],
event
)
@@ -1949,9 +1864,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -1959,7 +1875,7 @@ class FederationHandler(BaseHandler):
try:
# 2. Get remote difference.
- result = yield self.replication_layer.query_auth(
+ result = yield self.federation_client.query_auth(
origin,
event.room_id,
event.event_id,
@@ -2041,21 +1957,34 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update(state_updates)
- if context.delta_ids is not None:
- context.delta_ids = dict(context.delta_ids)
- context.delta_ids.update(state_updates)
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
- context.state_group = yield self.store.store_state_group(
+
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
)
@defer.inlineCallbacks
@@ -2245,7 +2174,7 @@ class FederationHandler(BaseHandler):
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
- yield self.replication_layer.forward_third_party_invite(
+ yield self.federation_client.forward_third_party_invite(
destinations,
room_id,
event_dict,
@@ -2295,7 +2224,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2337,7 +2267,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
@@ -2387,7 +2318,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
- response = yield self.hs.get_simple_http_client().get_json(
+ response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@@ -2398,3 +2329,91 @@ class FederationHandler(BaseHandler):
)
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")
+
+ @defer.inlineCallbacks
+ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
+ """Persists events and tells the notifier/pushers about them, if
+ necessary.
+
+ Args:
+ event_and_contexts(list[tuple[FrozenEvent, EventContext]])
+ backfilled (bool): Whether these events are a result of
+ backfilling or not
+
+ Returns:
+ Deferred
+ """
+ if self.config.worker_app:
+ yield self._send_events_to_master(
+ store=self.store,
+ event_and_contexts=event_and_contexts,
+ backfilled=backfilled
+ )
+ else:
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
+
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
+
+ def _notify_persisted_event(self, event, max_stream_id):
+ """Checks to see if notifier/pushers should be notified about the
+ event or not.
+
+ Args:
+ event (FrozenEvent)
+ max_stream_id (int): The max_stream_id returned by persist_events
+ """
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+
+ # We notify for memberships if its an invite for one of our
+ # users
+ if event.internal_metadata.is_outlier():
+ if event.membership != Membership.INVITE:
+ if not self.is_mine_id(target_user_id):
+ return
+
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+ elif event.internal_metadata.is_outlier():
+ return
+
+ event_stream_id = event.internal_metadata.stream_ordering
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ self.pusher_pool.on_new_notifications(
+ event_stream_id, max_stream_id,
+ )
+
+ def _clean_room_for_join(self, room_id):
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Args:
+ room_id (str)
+ """
+ if self.config.worker_app:
+ return self._clean_room_for_join_client(room_id)
+ else:
+ return self.store.clean_room_for_join(room_id)
+
+ def user_joined_room(self, user, room_id):
+ """Called when a new user has joined the room
+ """
+ if self.config.worker_app:
+ return self._notify_user_membership_change(
+ room_id=room_id,
+ user_id=user.to_string(),
+ change="joined",
+ )
+ else:
+ return user_joined_room(self.distributor, user, room_id)
|