diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 37e31d2b6f..7105ee21dc 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -102,8 +102,6 @@ class Auth(object):
def check_host_in_room(self, room_id, host):
curr_state = yield self.state.get_current_state(room_id)
- logger.debug("Got curr_state %s", curr_state)
-
for event in curr_state:
if event.type == EventTypes.Member:
try:
@@ -360,7 +358,7 @@ class Auth(object):
def add_auth_events(self, builder, context):
yield run_on_reactor()
- auth_ids = self.compute_auth_events(builder, context)
+ auth_ids = self.compute_auth_events(builder, context.current_state)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids
@@ -374,26 +372,26 @@ class Auth(object):
if v.event_id in auth_ids
}
- def compute_auth_events(self, event, context):
+ def compute_auth_events(self, event, current_state):
if event.type == EventTypes.Create:
return []
auth_ids = []
key = (EventTypes.PowerLevels, "", )
- power_level_event = context.current_state.get(key)
+ power_level_event = current_state.get(key)
if power_level_event:
auth_ids.append(power_level_event.event_id)
key = (EventTypes.JoinRules, "", )
- join_rule_event = context.current_state.get(key)
+ join_rule_event = current_state.get(key)
key = (EventTypes.Member, event.user_id, )
- member_event = context.current_state.get(key)
+ member_event = current_state.get(key)
key = (EventTypes.Create, "", )
- create_event = context.current_state.get(key)
+ create_event = current_state.get(key)
if create_event:
auth_ids.append(create_event.event_id)
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index ad478aa6b7..5041828f18 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -39,7 +39,7 @@ class Codes(object):
TOO_LARGE = "M_TOO_LARGE"
-class CodeMessageException(Exception):
+class CodeMessageException(RuntimeError):
"""An exception with integer code and message string attributes."""
def __init__(self, code, msg):
@@ -227,3 +227,9 @@ class FederationError(RuntimeError):
"affected": self.affected,
"source": self.source if self.source else self.affected,
}
+
+
+class HttpResponseException(CodeMessageException):
+ def __init__(self, code, msg, response):
+ self.response = response
+ super(HttpResponseException, self).__init__(code, msg)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index bf07951027..8f0c6e959f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -77,7 +77,7 @@ class EventBase(object):
return self.content["membership"]
def is_state(self):
- return hasattr(self, "state_key")
+ return hasattr(self, "state_key") and self.state_key is not None
def get_dict(self):
d = dict(self._event_dict)
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
new file mode 100644
index 0000000000..a990aec4fd
--- /dev/null
+++ b/synapse/federation/federation_base.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from synapse.events.utils import prune_event
+
+from syutil.jsonutil import encode_canonical_json
+
+from synapse.crypto.event_signing import check_event_content_hash
+
+from synapse.api.errors import SynapseError
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationBase(object):
+ @defer.inlineCallbacks
+ def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False):
+ """Takes a list of PDUs and checks the signatures and hashs of each
+ one. If a PDU fails its signature check then we check if we have it in
+ the database and if not then request if from the originating server of
+ that PDU.
+
+ If a PDU fails its content hash check then it is redacted.
+
+ The given list of PDUs are not modified, instead the function returns
+ a new list.
+
+ Args:
+ pdu (list)
+ outlier (bool)
+
+ Returns:
+ Deferred : A list of PDUs that have valid signatures and hashes.
+ """
+ signed_pdus = []
+ for pdu in pdus:
+ try:
+ new_pdu = yield self._check_sigs_and_hash(pdu)
+ signed_pdus.append(new_pdu)
+ except SynapseError:
+ # FIXME: We should handle signature failures more gracefully.
+
+ # Check local db.
+ new_pdu = yield self.store.get_event(
+ pdu.event_id,
+ allow_rejected=True
+ )
+ if new_pdu:
+ signed_pdus.append(new_pdu)
+ continue
+
+ # Check pdu.origin
+ if pdu.origin != origin:
+ new_pdu = yield self.get_pdu(
+ destinations=[pdu.origin],
+ event_id=pdu.event_id,
+ outlier=outlier,
+ )
+
+ if new_pdu:
+ signed_pdus.append(new_pdu)
+ continue
+
+ logger.warn("Failed to find copy of %s with valid signature")
+
+ defer.returnValue(signed_pdus)
+
+ @defer.inlineCallbacks
+ def _check_sigs_and_hash(self, pdu):
+ """Throws a SynapseError if the PDU does not have the correct
+ signatures.
+
+ Returns:
+ FrozenEvent: Either the given event or it redacted if it failed the
+ content hash check.
+ """
+ # Check signatures are correct.
+ redacted_event = prune_event(pdu)
+ redacted_pdu_json = redacted_event.get_pdu_json()
+
+ try:
+ yield self.keyring.verify_json_for_server(
+ pdu.origin, redacted_pdu_json
+ )
+ except SynapseError:
+ logger.warn(
+ "Signature check failed for %s redacted to %s",
+ encode_canonical_json(pdu.get_pdu_json()),
+ encode_canonical_json(redacted_pdu_json),
+ )
+ raise
+
+ if not check_event_content_hash(pdu):
+ logger.warn(
+ "Event content has been tampered, redacting %s, %s",
+ pdu.event_id, encode_canonical_json(pdu.get_dict())
+ )
+ defer.returnValue(redacted_event)
+
+ defer.returnValue(pdu)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index e1539bd0e0..70c9a6f46b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -16,17 +16,12 @@
from twisted.internet import defer
+from .federation_base import FederationBase
from .units import Edu
+from synapse.api.errors import CodeMessageException
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
import logging
@@ -34,7 +29,7 @@ import logging
logger = logging.getLogger(__name__)
-class FederationClient(object):
+class FederationClient(FederationBase):
@log_function
def send_pdu(self, pdu, destinations):
"""Informs the replication layer about a new PDU generated within the
@@ -186,7 +181,8 @@ class FederationClient(object):
pdu = yield self._check_sigs_and_hash(pdu)
break
-
+ except CodeMessageException:
+ raise
except Exception as e:
logger.info(
"Failed to get PDU %s from %s because %s",
@@ -224,17 +220,17 @@ class FederationClient(object):
for p in result.get("auth_chain", [])
]
- for i, pdu in enumerate(pdus):
- pdus[i] = yield self._check_sigs_and_hash(pdu)
-
- # FIXME: We should handle signature failures more gracefully.
+ signed_pdus = yield self._check_sigs_and_hash_and_fetch(
+ destination, pdus, outlier=True
+ )
- for i, pdu in enumerate(auth_chain):
- auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- # FIXME: We should handle signature failures more gracefully.
+ signed_auth.sort(key=lambda e: e.depth)
- defer.returnValue((pdus, auth_chain))
+ defer.returnValue((signed_pdus, signed_auth))
@defer.inlineCallbacks
@log_function
@@ -248,65 +244,88 @@ class FederationClient(object):
for p in res["auth_chain"]
]
- for i, pdu in enumerate(auth_chain):
- auth_chain[i] = yield self._check_sigs_and_hash(pdu)
-
- # FIXME: We should handle signature failures more gracefully.
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- auth_chain.sort(key=lambda e: e.depth)
+ signed_auth.sort(key=lambda e: e.depth)
- defer.returnValue(auth_chain)
+ defer.returnValue(signed_auth)
@defer.inlineCallbacks
- def make_join(self, destination, room_id, user_id):
- ret = yield self.transport_layer.make_join(
- destination, room_id, user_id
- )
+ def make_join(self, destinations, room_id, user_id):
+ for destination in destinations:
+ try:
+ ret = yield self.transport_layer.make_join(
+ destination, room_id, user_id
+ )
- pdu_dict = ret["event"]
+ pdu_dict = ret["event"]
- logger.debug("Got response to make_join: %s", pdu_dict)
+ logger.debug("Got response to make_join: %s", pdu_dict)
- defer.returnValue(self.event_from_pdu_json(pdu_dict))
+ defer.returnValue(
+ (destination, self.event_from_pdu_json(pdu_dict))
+ )
+ break
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.warn(
+ "Failed to make_join via %s: %s",
+ destination, e.message
+ )
- @defer.inlineCallbacks
- def send_join(self, destination, pdu):
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_join(
- destination=destination,
- room_id=pdu.room_id,
- event_id=pdu.event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ raise RuntimeError("Failed to send to any server.")
- logger.debug("Got content: %s", content)
+ @defer.inlineCallbacks
+ def send_join(self, destinations, pdu):
+ for destination in destinations:
+ try:
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_join(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
- state = [
- self.event_from_pdu_json(p, outlier=True)
- for p in content.get("state", [])
- ]
+ logger.debug("Got content: %s", content)
- auth_chain = [
- self.event_from_pdu_json(p, outlier=True)
- for p in content.get("auth_chain", [])
- ]
+ state = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("state", [])
+ ]
- for i, pdu in enumerate(state):
- state[i] = yield self._check_sigs_and_hash(pdu)
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
- # FIXME: We should handle signature failures more gracefully.
+ signed_state = yield self._check_sigs_and_hash_and_fetch(
+ destination, state, outlier=True
+ )
- for i, pdu in enumerate(auth_chain):
- auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- # FIXME: We should handle signature failures more gracefully.
+ auth_chain.sort(key=lambda e: e.depth)
- auth_chain.sort(key=lambda e: e.depth)
+ defer.returnValue({
+ "state": signed_state,
+ "auth_chain": signed_auth,
+ "origin": destination,
+ })
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.warn(
+ "Failed to send_join via %s: %s",
+ destination, e.message
+ )
- defer.returnValue({
- "state": state,
- "auth_chain": auth_chain,
- })
+ raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
@@ -353,12 +372,18 @@ class FederationClient(object):
)
auth_chain = [
- (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+ self.event_from_pdu_json(e)
for e in content["auth_chain"]
]
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+
+ signed_auth.sort(key=lambda e: e.depth)
+
ret = {
- "auth_chain": auth_chain,
+ "auth_chain": signed_auth,
"rejects": content.get("rejects", []),
"missing": content.get("missing", []),
}
@@ -373,37 +398,3 @@ class FederationClient(object):
event.internal_metadata.outlier = outlier
return event
-
- @defer.inlineCallbacks
- def _check_sigs_and_hash(self, pdu):
- """Throws a SynapseError if the PDU does not have the correct
- signatures.
-
- Returns:
- FrozenEvent: Either the given event or it redacted if it failed the
- content hash check.
- """
- # Check signatures are correct.
- redacted_event = prune_event(pdu)
- redacted_pdu_json = redacted_event.get_pdu_json()
-
- try:
- yield self.keyring.verify_json_for_server(
- pdu.origin, redacted_pdu_json
- )
- except SynapseError:
- logger.warn(
- "Signature check failed for %s redacted to %s",
- encode_canonical_json(pdu.get_pdu_json()),
- encode_canonical_json(redacted_pdu_json),
- )
- raise
-
- if not check_event_content_hash(pdu):
- logger.warn(
- "Event content has been tampered, redacting %s, %s",
- pdu.event_id, encode_canonical_json(pdu.get_dict())
- )
- defer.returnValue(redacted_event)
-
- defer.returnValue(pdu)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5fbd8b19de..4742ca9390 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -16,16 +16,12 @@
from twisted.internet import defer
+from .federation_base import FederationBase
from .units import Transaction, Edu
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
from synapse.api.errors import FederationError, SynapseError
@@ -35,7 +31,7 @@ import logging
logger = logging.getLogger(__name__)
-class FederationServer(object):
+class FederationServer(FederationBase):
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
@@ -251,17 +247,20 @@ class FederationServer(object):
Deferred: Results in `dict` with the same format as `content`
"""
auth_chain = [
- (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+ self.event_from_pdu_json(e)
for e in content["auth_chain"]
]
- missing = [
- (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
- for e in content.get("missing", [])
- ]
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ origin, auth_chain, outlier=True
+ )
ret = yield self.handler.on_query_auth(
- origin, event_id, auth_chain, content.get("rejects", []), missing
+ origin,
+ event_id,
+ signed_auth,
+ content.get("rejects", []),
+ content.get("missing", []),
)
time_now = self._clock.time_msec()
@@ -426,37 +425,3 @@ class FederationServer(object):
event.internal_metadata.outlier = outlier
return event
-
- @defer.inlineCallbacks
- def _check_sigs_and_hash(self, pdu):
- """Throws a SynapseError if the PDU does not have the correct
- signatures.
-
- Returns:
- FrozenEvent: Either the given event or it redacted if it failed the
- content hash check.
- """
- # Check signatures are correct.
- redacted_event = prune_event(pdu)
- redacted_pdu_json = redacted_event.get_pdu_json()
-
- try:
- yield self.keyring.verify_json_for_server(
- pdu.origin, redacted_pdu_json
- )
- except SynapseError:
- logger.warn(
- "Signature check failed for %s redacted to %s",
- encode_canonical_json(pdu.get_pdu_json()),
- encode_canonical_json(redacted_pdu_json),
- )
- raise
-
- if not check_event_content_hash(pdu):
- logger.warn(
- "Event content has been tampered, redacting %s, %s",
- pdu.event_id, encode_canonical_json(pdu.get_dict())
- )
- defer.returnValue(redacted_event)
-
- defer.returnValue(pdu)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d4f2c09a2..f38aeba7cc 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction
+from synapse.api.errors import HttpResponseException
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
@@ -238,9 +239,14 @@ class TransactionQueue(object):
del p["age_ts"]
return data
- code, response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
+ try:
+ response = yield self.transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
logger.info("TX [%s] got %d response", destination, code)
@@ -274,8 +280,7 @@ class TransactionQueue(object):
pass
logger.debug("TX [%s] Yielded to callbacks", destination)
-
- except Exception as e:
+ except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
@@ -283,6 +288,14 @@ class TransactionQueue(object):
destination,
e,
)
+ except Exception as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.exception(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
self.set_retrying(destination, retry_interval)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4cb1dea2de..8b137e7128 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
-import json
logger = logging.getLogger(__name__)
@@ -129,7 +128,7 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
- code, response = yield self.client.put_json(
+ response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
data=json_data,
@@ -137,95 +136,86 @@ class TransportLayerClient(object):
)
logger.debug(
- "send_data dest=%s, txid=%s, got response: %d",
- transaction.destination, transaction.transaction_id, code
+ "send_data dest=%s, txid=%s, got response: 200",
+ transaction.destination, transaction.transaction_id,
)
- defer.returnValue((code, response))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail):
path = PREFIX + "/query/%s" % query_type
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
retry_on_dns_fail=retry_on_dns_fail,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
- code, content = yield self.client.put_json(
+ response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_join", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
- code, content = yield self.client.put_json(
+ response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_invite", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def send_query_auth(self, destination, room_id, event_id, content):
path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
- code, content = yield self.client.post_json(
+ content = yield self.client.post_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_invite", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(content)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 58e9a91562..7b60921040 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -113,7 +113,16 @@ class DirectoryHandler(BaseHandler):
)
extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
- servers = list(set(extra_servers) | set(servers))
+ servers = set(extra_servers) | set(servers)
+
+ # If this server is in the list of servers, return it first.
+ if self.server_name in servers:
+ servers = (
+ [self.server_name]
+ + [s for s in servers if s != self.server_name]
+ )
+ else:
+ servers = list(servers)
defer.returnValue({
"room_id": room_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8bf5a4cc11..aba266c2bc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -30,6 +30,7 @@ from synapse.types import UserID
from twisted.internet import defer
+import itertools
import logging
@@ -123,8 +124,21 @@ class FederationHandler(BaseHandler):
logger.debug("Got event for room we're not in.")
current_state = state
+ event_ids = set()
+ if state:
+ event_ids |= {e.event_id for e in state}
+ if auth_chain:
+ event_ids |= {e.event_id for e in auth_chain}
+
+ seen_ids = (yield self.store.have_events(event_ids)).keys()
+
if state and auth_chain is not None:
- for e in state:
+ # If we have any state or auth_chain given to us by the replication
+ # layer, then we should handle them (if we haven't before.)
+ for e in itertools.chain(auth_chain, state):
+ if e.event_id in seen_ids:
+ continue
+
e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
@@ -132,7 +146,10 @@ class FederationHandler(BaseHandler):
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
- yield self._handle_new_event(origin, e, auth_events=auth)
+ yield self._handle_new_event(
+ origin, e, auth_events=auth
+ )
+ seen_ids.add(e.event_id)
except:
logger.exception(
"Failed to handle state event %s",
@@ -256,7 +273,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+ def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
""" Attempts to join the `joinee` to the room `room_id` via the
server `target_host`.
@@ -270,8 +287,8 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
- pdu = yield self.replication_layer.make_join(
- target_host,
+ origin, pdu = yield self.replication_layer.make_join(
+ target_hosts,
room_id,
joinee
)
@@ -313,11 +330,17 @@ class FederationHandler(BaseHandler):
new_event = builder.build()
+ # Try the host we successfully got a response to /make_join/
+ # request first.
+ target_hosts.remove(origin)
+ target_hosts.insert(0, origin)
+
ret = yield self.replication_layer.send_join(
- target_host,
+ target_hosts,
new_event
)
+ origin = ret["origin"]
state = ret["state"]
auth_chain = ret["auth_chain"]
auth_chain.sort(key=lambda e: e.depth)
@@ -354,7 +377,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids
}
yield self._handle_new_event(
- target_host, e, auth_events=auth
+ origin, e, auth_events=auth
)
except:
logger.exception(
@@ -374,7 +397,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids
}
yield self._handle_new_event(
- target_host, e, auth_events=auth
+ origin, e, auth_events=auth
)
except:
logger.exception(
@@ -389,7 +412,7 @@ class FederationHandler(BaseHandler):
}
yield self._handle_new_event(
- target_host,
+ origin,
new_event,
state=state,
current_state=state,
@@ -498,6 +521,8 @@ class FederationHandler(BaseHandler):
"Failed to get destination from event %s", s.event_id
)
+ destinations.remove(origin)
+
logger.debug(
"on_send_join_request: Sending event: %s, signatures: %s",
event.event_id,
@@ -618,6 +643,7 @@ class FederationHandler(BaseHandler):
event = yield self.store.get_event(
event_id,
allow_none=True,
+ allow_rejected=True,
)
if event:
@@ -701,6 +727,8 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
+ # FIXME: Don't store as rejected with AUTH_ERROR if we haven't
+ # seen all the auth events.
yield self.store.persist_event(
event,
context=context,
@@ -750,7 +778,7 @@ class FederationHandler(BaseHandler):
)
)
- logger.debug("on_query_auth reutrning: %s", ret)
+ logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -770,41 +798,45 @@ class FederationHandler(BaseHandler):
if missing_auth:
logger.debug("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
- remote_auth_chain = yield self.replication_layer.get_event_auth(
- origin, event.room_id, event.event_id
- )
+ try:
+ remote_auth_chain = yield self.replication_layer.get_event_auth(
+ origin, event.room_id, event.event_id
+ )
- seen_remotes = yield self.store.have_events(
- [e.event_id for e in remote_auth_chain]
- )
+ seen_remotes = yield self.store.have_events(
+ [e.event_id for e in remote_auth_chain]
+ )
- for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
- continue
+ for e in remote_auth_chain:
+ if e.event_id in seen_remotes.keys():
+ continue
- if e.event_id == event.event_id:
- continue
+ if e.event_id == event.event_id:
+ continue
- try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in remote_auth_chain
- if e.event_id in auth_ids
- }
- e.internal_metadata.outlier = True
+ try:
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in remote_auth_chain
+ if e.event_id in auth_ids
+ }
+ e.internal_metadata.outlier = True
- logger.debug(
- "do_auth %s missing_auth: %s",
- event.event_id, e.event_id
- )
- yield self._handle_new_event(
- origin, e, auth_events=auth
- )
+ logger.debug(
+ "do_auth %s missing_auth: %s",
+ event.event_id, e.event_id
+ )
+ yield self._handle_new_event(
+ origin, e, auth_events=auth
+ )
- if e.event_id in event_auth_events:
- auth_events[(e.type, e.state_key)] = e
- except AuthError:
- pass
+ if e.event_id in event_auth_events:
+ auth_events[(e.type, e.state_key)] = e
+ except AuthError:
+ pass
+ except:
+ # FIXME:
+ logger.exception("Failed to get auth chain")
# FIXME: Assumes we have and stored all the state for all the
# prev_events
@@ -816,50 +848,57 @@ class FederationHandler(BaseHandler):
logger.debug("Different auth: %s", different_auth)
# 1. Get what we think is the auth chain.
- auth_ids = self.auth.compute_auth_events(event, context)
- local_auth_chain = yield self.store.get_auth_chain(auth_ids)
-
- # 2. Get remote difference.
- result = yield self.replication_layer.query_auth(
- origin,
- event.room_id,
- event.event_id,
- local_auth_chain,
- )
-
- seen_remotes = yield self.store.have_events(
- [e.event_id for e in result["auth_chain"]]
+ auth_ids = self.auth.compute_auth_events(
+ event, context.current_state
)
+ local_auth_chain = yield self.store.get_auth_chain(auth_ids)
- # 3. Process any remote auth chain events we haven't seen.
- for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
- continue
+ try:
+ # 2. Get remote difference.
+ result = yield self.replication_layer.query_auth(
+ origin,
+ event.room_id,
+ event.event_id,
+ local_auth_chain,
+ )
- if ev.event_id == event.event_id:
- continue
+ seen_remotes = yield self.store.have_events(
+ [e.event_id for e in result["auth_chain"]]
+ )
- try:
- auth_ids = [e_id for e_id, _ in ev.auth_events]
- auth = {
- (e.type, e.state_key): e for e in result["auth_chain"]
- if e.event_id in auth_ids
- }
- ev.internal_metadata.outlier = True
+ # 3. Process any remote auth chain events we haven't seen.
+ for ev in result["auth_chain"]:
+ if ev.event_id in seen_remotes.keys():
+ continue
+
+ if ev.event_id == event.event_id:
+ continue
+
+ try:
+ auth_ids = [e_id for e_id, _ in ev.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in result["auth_chain"]
+ if e.event_id in auth_ids
+ }
+ ev.internal_metadata.outlier = True
+
+ logger.debug(
+ "do_auth %s different_auth: %s",
+ event.event_id, e.event_id
+ )
- logger.debug(
- "do_auth %s different_auth: %s",
- event.event_id, e.event_id
- )
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
+ if ev.event_id in event_auth_events:
+ auth_events[(ev.type, ev.state_key)] = ev
+ except AuthError:
+ pass
- if ev.event_id in event_auth_events:
- auth_events[(ev.type, ev.state_key)] = ev
- except AuthError:
- pass
+ except:
+ # FIXME:
+ logger.exception("Failed to query auth chain")
# 4. Look at rejects and their proofs.
# TODO.
@@ -983,7 +1022,7 @@ class FederationHandler(BaseHandler):
if reason is None:
# FIXME: ERRR?!
logger.warn("Could not find reason for %s", e.event_id)
- raise RuntimeError("")
+ raise RuntimeError("Could not find reason for %s" % e.event_id)
reason_map[e.event_id] = reason
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23821d321f..0369b907a5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler):
if not hosts:
raise SynapseError(404, "No known servers")
- host = hosts[0]
-
# If event doesn't include a display name, add one.
yield self.distributor.fire(
"collect_presencelike_data", joinee, content
@@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler):
})
event, context = yield self._create_new_client_event(builder)
- yield self._do_join(event, context, room_host=host, do_auth=True)
+ yield self._do_join(event, context, room_hosts=hosts, do_auth=True)
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def _do_join(self, event, context, room_host=None, do_auth=True):
+ def _do_join(self, event, context, room_hosts=None, do_auth=True):
joinee = UserID.from_string(event.state_key)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
@@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler):
if is_host_in_room:
should_do_dance = False
- elif room_host: # TODO: Shouldn't this be remote_room_host?
+ elif room_hosts: # TODO: Shouldn't this be remote_room_host?
should_do_dance = True
else:
# TODO(markjh): get prev_state from snapshot
@@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler):
inviter = UserID.from_string(prev_state.user_id)
should_do_dance = not self.hs.is_mine(inviter)
- room_host = inviter.domain
+ room_hosts = [inviter.domain]
else:
# return the same error as join_room_alias does
raise SynapseError(404, "No known servers")
@@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler):
if should_do_dance:
handler = self.hs.get_handlers().federation_handler
yield handler.do_invite_join(
- room_host,
+ room_hosts,
room_id,
event.user_id,
event.get_dict()["content"], # FIXME To get a non-frozen dict
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c7bf1b47b8..8559d06b7f 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext
from syutil.jsonutil import encode_canonical_json
-from synapse.api.errors import CodeMessageException, SynapseError, Codes
+from synapse.api.errors import (
+ SynapseError, Codes, HttpResponseException,
+)
from syutil.crypto.jsonsign import sign_json
@@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object):
)
if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
pass
else:
# :'(
# Update transactions table?
- raise CodeMessageException(
- response.code, response.phrase
+ raise HttpResponseException(
+ response.code, response.phrase, response
)
defer.returnValue(response)
@@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object):
headers_dict={"Content-Type": ["application/json"]},
)
+ if 200 <= response.code < 300:
+ # We need to update the transactions table to say it was sent?
+ c_type = response.headers.getRawHeaders("Content-Type")
+
+ if "application/json" not in c_type:
+ raise RuntimeError(
+ "Content-Type not application/json"
+ )
+
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
- defer.returnValue((response.code, body))
+ defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def post_json(self, destination, path, data={}):
@@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object):
headers_dict={"Content-Type": ["application/json"]},
)
+ if 200 <= response.code < 300:
+ # We need to update the transactions table to say it was sent?
+ c_type = response.headers.getRawHeaders("Content-Type")
+
+ if "application/json" not in c_type:
+ raise RuntimeError(
+ "Content-Type not application/json"
+ )
+
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
- defer.returnValue((response.code, body))
+ defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
@@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object):
retry_on_dns_fail=retry_on_dns_fail
)
+ if 200 <= response.code < 300:
+ # We need to update the transactions table to say it was sent?
+ c_type = response.headers.getRawHeaders("Content-Type")
+
+ if "application/json" not in c_type:
+ raise RuntimeError(
+ "Content-Type not application/json"
+ )
+
+ logger.debug("Getting resp body")
body = yield readBody(response)
+ logger.debug("Got resp body")
defer.returnValue(json.loads(body))
diff --git a/synapse/state.py b/synapse/state.py
index 8a056ee955..695a5e7ac4 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -37,7 +37,10 @@ def _get_state_key_from_event(event):
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
-AuthEventTypes = (EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,)
+AuthEventTypes = (
+ EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,
+ EventTypes.JoinRules,
+)
class StateHandler(object):
@@ -100,7 +103,9 @@ class StateHandler(object):
context.state_group = None
if hasattr(event, "auth_events") and event.auth_events:
- auth_ids = zip(*event.auth_events)[0]
+ auth_ids = self.hs.get_auth().compute_auth_events(
+ event, context.current_state
+ )
context.auth_events = {
k: v
for k, v in context.current_state.items()
@@ -146,7 +151,9 @@ class StateHandler(object):
event.unsigned["replaces_state"] = replaces.event_id
if hasattr(event, "auth_events") and event.auth_events:
- auth_ids = zip(*event.auth_events)[0]
+ auth_ids = self.hs.get_auth().compute_auth_events(
+ event, context.current_state
+ )
context.auth_events = {
k: v
for k, v in context.current_state.items()
@@ -259,6 +266,15 @@ class StateHandler(object):
auth_events.update(resolved_state)
for key, events in conflicted_state.items():
+ if key[0] == EventTypes.JoinRules:
+ resolved_state[key] = self._resolve_auth_events(
+ events,
+ auth_events
+ )
+
+ auth_events.update(resolved_state)
+
+ for key, events in conflicted_state.items():
if key[0] == EventTypes.Member:
resolved_state[key] = self._resolve_auth_events(
events,
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7c54b1b9d3..a63c59a8a2 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -128,21 +128,144 @@ class DataStore(RoomMemberStore, RoomStore,
pass
@defer.inlineCallbacks
- def get_event(self, event_id, allow_none=False):
- events = yield self._get_events([event_id])
+ def get_event(self, event_id, check_redacted=True,
+ get_prev_content=False, allow_rejected=False,
+ allow_none=False):
+ """Get an event from the database by event_id.
+
+ Args:
+ event_id (str): The event_id of the event to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+ allow_none (bool): If True, return None if no event found, if
+ False throw an exception.
- if not events:
- if allow_none:
- defer.returnValue(None)
- else:
- raise RuntimeError("Could not find event %s" % (event_id,))
+ Returns:
+ Deferred : A FrozenEvent.
+ """
+ event = yield self.runInteraction(
+ "get_event", self._get_event_txn,
+ event_id,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
- defer.returnValue(events[0])
+ if not event and not allow_none:
+ raise RuntimeError("Could not find event %s" % (event_id,))
+
+ defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
+
+ # We purposefully do this first since if we include a `current_state`
+ # key, we *want* to update the `current_state_events` table
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
+ if event.is_state() and is_new_state:
+ if not backfilled and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
+
+ outlier = event.internal_metadata.is_outlier()
+
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
+
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
+ have_persisted = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event.event_id},
+ retcol="event_id",
+ allow_none=True,
+ )
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ # If we have already persisted this event, we don't need to do any
+ # more processing.
+ # The processing above must be done on every call to persist event,
+ # since they might not have happened on previous calls. For example,
+ # if we are persisting an event that we had persisted as an outlier,
+ # but is no longer one.
+ if have_persisted:
+ if not outlier:
+ sql = (
+ "UPDATE event_json SET internal_metadata = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (metadata_json.decode("UTF-8"), event.event_id,)
+ )
+
+ sql = (
+ "UPDATE events SET outlier = 0"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (event.event_id,)
+ )
+ return
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
@@ -154,8 +277,6 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
- outlier = event.internal_metadata.is_outlier()
-
event_dict = {
k: v
for k, v in event.get_dict().items()
@@ -165,10 +286,6 @@ class DataStore(RoomMemberStore, RoomStore,
]
}
- metadata_json = encode_canonical_json(
- event.internal_metadata.get_dict()
- )
-
self._simple_insert_txn(
txn,
table="event_json",
@@ -224,41 +341,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
- if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
- )
-
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- },
- or_replace=True,
- )
-
- is_state = hasattr(event, "state_key") and event.state_key is not None
- if is_state:
+ if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -266,6 +352,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
+ # TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
@@ -302,28 +389,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
- if not backfilled and not context.rejected:
- self._simple_insert_txn(
- txn,
- table="state_forward_extremities",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for prev_state_id, _ in event.prev_state:
- self._simple_delete_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "event_id": prev_state_id,
- }
- )
-
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
@@ -354,13 +419,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
- if not outlier:
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
- )
-
def _store_redaction(self, txn, event):
txn.execute(
"INSERT OR IGNORE INTO redactions "
@@ -477,6 +535,9 @@ class DataStore(RoomMemberStore, RoomStore,
the rejected reason string if we rejected the event, else maps to
None.
"""
+ if not event_ids:
+ return defer.succeed({})
+
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
|