diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 000bf5793c..24ea3573d3 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -120,7 +120,16 @@ class DirectoryHandler(BaseHandler):
)
extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
- servers = list(set(extra_servers) | set(servers))
+ servers = set(extra_servers) | set(servers)
+
+ # If this server is in the list of servers, return it first.
+ if self.server_name in servers:
+ servers = (
+ [self.server_name]
+ + [s for s in servers if s != self.server_name]
+ )
+ else:
+ servers = list(servers)
defer.returnValue({
"room_id": room_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8bf5a4cc11..aba266c2bc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -30,6 +30,7 @@ from synapse.types import UserID
from twisted.internet import defer
+import itertools
import logging
@@ -123,8 +124,21 @@ class FederationHandler(BaseHandler):
logger.debug("Got event for room we're not in.")
current_state = state
+ event_ids = set()
+ if state:
+ event_ids |= {e.event_id for e in state}
+ if auth_chain:
+ event_ids |= {e.event_id for e in auth_chain}
+
+ seen_ids = (yield self.store.have_events(event_ids)).keys()
+
if state and auth_chain is not None:
- for e in state:
+ # If we have any state or auth_chain given to us by the replication
+ # layer, then we should handle them (if we haven't before.)
+ for e in itertools.chain(auth_chain, state):
+ if e.event_id in seen_ids:
+ continue
+
e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
@@ -132,7 +146,10 @@ class FederationHandler(BaseHandler):
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
- yield self._handle_new_event(origin, e, auth_events=auth)
+ yield self._handle_new_event(
+ origin, e, auth_events=auth
+ )
+ seen_ids.add(e.event_id)
except:
logger.exception(
"Failed to handle state event %s",
@@ -256,7 +273,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+ def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
""" Attempts to join the `joinee` to the room `room_id` via the
server `target_host`.
@@ -270,8 +287,8 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
- pdu = yield self.replication_layer.make_join(
- target_host,
+ origin, pdu = yield self.replication_layer.make_join(
+ target_hosts,
room_id,
joinee
)
@@ -313,11 +330,17 @@ class FederationHandler(BaseHandler):
new_event = builder.build()
+ # Try the host we successfully got a response to /make_join/
+ # request first.
+ target_hosts.remove(origin)
+ target_hosts.insert(0, origin)
+
ret = yield self.replication_layer.send_join(
- target_host,
+ target_hosts,
new_event
)
+ origin = ret["origin"]
state = ret["state"]
auth_chain = ret["auth_chain"]
auth_chain.sort(key=lambda e: e.depth)
@@ -354,7 +377,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids
}
yield self._handle_new_event(
- target_host, e, auth_events=auth
+ origin, e, auth_events=auth
)
except:
logger.exception(
@@ -374,7 +397,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids
}
yield self._handle_new_event(
- target_host, e, auth_events=auth
+ origin, e, auth_events=auth
)
except:
logger.exception(
@@ -389,7 +412,7 @@ class FederationHandler(BaseHandler):
}
yield self._handle_new_event(
- target_host,
+ origin,
new_event,
state=state,
current_state=state,
@@ -498,6 +521,8 @@ class FederationHandler(BaseHandler):
"Failed to get destination from event %s", s.event_id
)
+ destinations.remove(origin)
+
logger.debug(
"on_send_join_request: Sending event: %s, signatures: %s",
event.event_id,
@@ -618,6 +643,7 @@ class FederationHandler(BaseHandler):
event = yield self.store.get_event(
event_id,
allow_none=True,
+ allow_rejected=True,
)
if event:
@@ -701,6 +727,8 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
+ # FIXME: Don't store as rejected with AUTH_ERROR if we haven't
+ # seen all the auth events.
yield self.store.persist_event(
event,
context=context,
@@ -750,7 +778,7 @@ class FederationHandler(BaseHandler):
)
)
- logger.debug("on_query_auth reutrning: %s", ret)
+ logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -770,41 +798,45 @@ class FederationHandler(BaseHandler):
if missing_auth:
logger.debug("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
- remote_auth_chain = yield self.replication_layer.get_event_auth(
- origin, event.room_id, event.event_id
- )
+ try:
+ remote_auth_chain = yield self.replication_layer.get_event_auth(
+ origin, event.room_id, event.event_id
+ )
- seen_remotes = yield self.store.have_events(
- [e.event_id for e in remote_auth_chain]
- )
+ seen_remotes = yield self.store.have_events(
+ [e.event_id for e in remote_auth_chain]
+ )
- for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
- continue
+ for e in remote_auth_chain:
+ if e.event_id in seen_remotes.keys():
+ continue
- if e.event_id == event.event_id:
- continue
+ if e.event_id == event.event_id:
+ continue
- try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in remote_auth_chain
- if e.event_id in auth_ids
- }
- e.internal_metadata.outlier = True
+ try:
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in remote_auth_chain
+ if e.event_id in auth_ids
+ }
+ e.internal_metadata.outlier = True
- logger.debug(
- "do_auth %s missing_auth: %s",
- event.event_id, e.event_id
- )
- yield self._handle_new_event(
- origin, e, auth_events=auth
- )
+ logger.debug(
+ "do_auth %s missing_auth: %s",
+ event.event_id, e.event_id
+ )
+ yield self._handle_new_event(
+ origin, e, auth_events=auth
+ )
- if e.event_id in event_auth_events:
- auth_events[(e.type, e.state_key)] = e
- except AuthError:
- pass
+ if e.event_id in event_auth_events:
+ auth_events[(e.type, e.state_key)] = e
+ except AuthError:
+ pass
+ except:
+ # FIXME:
+ logger.exception("Failed to get auth chain")
# FIXME: Assumes we have and stored all the state for all the
# prev_events
@@ -816,50 +848,57 @@ class FederationHandler(BaseHandler):
logger.debug("Different auth: %s", different_auth)
# 1. Get what we think is the auth chain.
- auth_ids = self.auth.compute_auth_events(event, context)
- local_auth_chain = yield self.store.get_auth_chain(auth_ids)
-
- # 2. Get remote difference.
- result = yield self.replication_layer.query_auth(
- origin,
- event.room_id,
- event.event_id,
- local_auth_chain,
- )
-
- seen_remotes = yield self.store.have_events(
- [e.event_id for e in result["auth_chain"]]
+ auth_ids = self.auth.compute_auth_events(
+ event, context.current_state
)
+ local_auth_chain = yield self.store.get_auth_chain(auth_ids)
- # 3. Process any remote auth chain events we haven't seen.
- for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
- continue
+ try:
+ # 2. Get remote difference.
+ result = yield self.replication_layer.query_auth(
+ origin,
+ event.room_id,
+ event.event_id,
+ local_auth_chain,
+ )
- if ev.event_id == event.event_id:
- continue
+ seen_remotes = yield self.store.have_events(
+ [e.event_id for e in result["auth_chain"]]
+ )
- try:
- auth_ids = [e_id for e_id, _ in ev.auth_events]
- auth = {
- (e.type, e.state_key): e for e in result["auth_chain"]
- if e.event_id in auth_ids
- }
- ev.internal_metadata.outlier = True
+ # 3. Process any remote auth chain events we haven't seen.
+ for ev in result["auth_chain"]:
+ if ev.event_id in seen_remotes.keys():
+ continue
+
+ if ev.event_id == event.event_id:
+ continue
+
+ try:
+ auth_ids = [e_id for e_id, _ in ev.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in result["auth_chain"]
+ if e.event_id in auth_ids
+ }
+ ev.internal_metadata.outlier = True
+
+ logger.debug(
+ "do_auth %s different_auth: %s",
+ event.event_id, e.event_id
+ )
- logger.debug(
- "do_auth %s different_auth: %s",
- event.event_id, e.event_id
- )
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
+ if ev.event_id in event_auth_events:
+ auth_events[(ev.type, ev.state_key)] = ev
+ except AuthError:
+ pass
- if ev.event_id in event_auth_events:
- auth_events[(ev.type, ev.state_key)] = ev
- except AuthError:
- pass
+ except:
+ # FIXME:
+ logger.exception("Failed to query auth chain")
# 4. Look at rejects and their proofs.
# TODO.
@@ -983,7 +1022,7 @@ class FederationHandler(BaseHandler):
if reason is None:
# FIXME: ERRR?!
logger.warn("Could not find reason for %s", e.event_id)
- raise RuntimeError("")
+ raise RuntimeError("Could not find reason for %s" % e.event_id)
reason_map[e.event_id] = reason
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6fbd2af4ab..3f51f38f18 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError
+from synapse.api.errors import RoomError, SynapseError
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -372,10 +372,17 @@ class MessageHandler(BaseHandler):
as_event=True,
)
presence.append(member_presence)
- except Exception:
- logger.exception(
- "Failed to get member presence of %r", m.user_id
- )
+ except SynapseError as e:
+ if e.code == 404:
+ # FIXME: We are doing this as a warn since this gets hit a
+ # lot and spams the logs. Why is this happening?
+ logger.warn(
+ "Failed to get member presence of %r", m.user_id
+ )
+ else:
+ logger.exception(
+ "Failed to get member presence of %r", m.user_id
+ )
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23821d321f..0369b907a5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler):
if not hosts:
raise SynapseError(404, "No known servers")
- host = hosts[0]
-
# If event doesn't include a display name, add one.
yield self.distributor.fire(
"collect_presencelike_data", joinee, content
@@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler):
})
event, context = yield self._create_new_client_event(builder)
- yield self._do_join(event, context, room_host=host, do_auth=True)
+ yield self._do_join(event, context, room_hosts=hosts, do_auth=True)
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def _do_join(self, event, context, room_host=None, do_auth=True):
+ def _do_join(self, event, context, room_hosts=None, do_auth=True):
joinee = UserID.from_string(event.state_key)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
@@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler):
if is_host_in_room:
should_do_dance = False
- elif room_host: # TODO: Shouldn't this be remote_room_host?
+ elif room_hosts: # TODO: Shouldn't this be remote_room_host?
should_do_dance = True
else:
# TODO(markjh): get prev_state from snapshot
@@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler):
inviter = UserID.from_string(prev_state.user_id)
should_do_dance = not self.hs.is_mine(inviter)
- room_host = inviter.domain
+ room_hosts = [inviter.domain]
else:
# return the same error as join_room_alias does
raise SynapseError(404, "No known servers")
@@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler):
if should_do_dance:
handler = self.hs.get_handlers().federation_handler
yield handler.do_invite_join(
- room_host,
+ room_hosts,
room_id,
event.user_id,
event.get_dict()["content"], # FIXME To get a non-frozen dict
|