diff --git a/synapse/__init__.py b/synapse/__init__.py
index 5c0f2f83aa..1810cb6fcd 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.33.0"
+__version__ = "0.33.1"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 14f5540280..b41d595059 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -70,20 +70,6 @@ class CodeMessageException(RuntimeError):
self.code = code
self.msg = msg
- def error_dict(self):
- return cs_error(self.msg)
-
-
-class MatrixCodeMessageException(CodeMessageException):
- """An error from a general matrix endpoint, eg. from a proxied Matrix API call.
-
- Attributes:
- errcode (str): Matrix error code e.g 'M_FORBIDDEN'
- """
- def __init__(self, code, msg, errcode=Codes.UNKNOWN):
- super(MatrixCodeMessageException, self).__init__(code, msg)
- self.errcode = errcode
-
class SynapseError(CodeMessageException):
"""A base exception type for matrix errors which have an errcode and error
@@ -109,38 +95,28 @@ class SynapseError(CodeMessageException):
self.errcode,
)
- @classmethod
- def from_http_response_exception(cls, err):
- """Make a SynapseError based on an HTTPResponseException
-
- This is useful when a proxied request has failed, and we need to
- decide how to map the failure onto a matrix error to send back to the
- client.
-
- An attempt is made to parse the body of the http response as a matrix
- error. If that succeeds, the errcode and error message from the body
- are used as the errcode and error message in the new synapse error.
-
- Otherwise, the errcode is set to M_UNKNOWN, and the error message is
- set to the reason code from the HTTP response.
- Args:
- err (HttpResponseException):
+class ProxiedRequestError(SynapseError):
+ """An error from a general matrix endpoint, eg. from a proxied Matrix API call.
- Returns:
- SynapseError:
- """
- # try to parse the body as json, to get better errcode/msg, but
- # default to M_UNKNOWN with the HTTP status as the error text
- try:
- j = json.loads(err.response)
- except ValueError:
- j = {}
- errcode = j.get('errcode', Codes.UNKNOWN)
- errmsg = j.get('error', err.msg)
+ Attributes:
+ errcode (str): Matrix error code e.g 'M_FORBIDDEN'
+ """
+ def __init__(self, code, msg, errcode=Codes.UNKNOWN, additional_fields=None):
+ super(ProxiedRequestError, self).__init__(
+ code, msg, errcode
+ )
+ if additional_fields is None:
+ self._additional_fields = {}
+ else:
+ self._additional_fields = dict(additional_fields)
- res = SynapseError(err.code, errmsg, errcode)
- return res
+ def error_dict(self):
+ return cs_error(
+ self.msg,
+ self.errcode,
+ **self._additional_fields
+ )
class ConsentNotGivenError(SynapseError):
@@ -309,14 +285,6 @@ class LimitExceededError(SynapseError):
)
-def cs_exception(exception):
- if isinstance(exception, CodeMessageException):
- return exception.error_dict()
- else:
- logger.error("Unknown exception type: %s", type(exception))
- return {}
-
-
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
""" Utility method for constructing an error response for client-server
interactions.
@@ -373,7 +341,7 @@ class HttpResponseException(CodeMessageException):
Represents an HTTP-level failure of an outbound request
Attributes:
- response (str): body of response
+ response (bytes): body of response
"""
def __init__(self, code, msg, response):
"""
@@ -381,7 +349,39 @@ class HttpResponseException(CodeMessageException):
Args:
code (int): HTTP status code
msg (str): reason phrase from HTTP response status line
- response (str): body of response
+ response (bytes): body of response
"""
super(HttpResponseException, self).__init__(code, msg)
self.response = response
+
+ def to_synapse_error(self):
+ """Make a SynapseError based on an HTTPResponseException
+
+ This is useful when a proxied request has failed, and we need to
+ decide how to map the failure onto a matrix error to send back to the
+ client.
+
+ An attempt is made to parse the body of the http response as a matrix
+ error. If that succeeds, the errcode and error message from the body
+ are used as the errcode and error message in the new synapse error.
+
+ Otherwise, the errcode is set to M_UNKNOWN, and the error message is
+ set to the reason code from the HTTP response.
+
+ Returns:
+ SynapseError:
+ """
+ # try to parse the body as json, to get better errcode/msg, but
+ # default to M_UNKNOWN with the HTTP status as the error text
+ try:
+ j = json.loads(self.response)
+ except ValueError:
+ j = {}
+
+ if not isinstance(j, dict):
+ j = {}
+
+ errcode = j.pop('errcode', Codes.UNKNOWN)
+ errmsg = j.pop('error', self.msg)
+
+ return ProxiedRequestError(self.code, errmsg, errcode, j)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a8014e9c50..6a471a0a5e 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -217,6 +217,8 @@ class ServerConfig(Config):
# different cores. See
# https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
#
+ # This setting requires the affinity package to be installed!
+ #
# cpu_affinity: 0xFFFFFFFF
# Whether to serve a web client from the HTTP/HTTPS root resource.
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 62d7ed13cf..7550e11b6e 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -48,6 +48,13 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t
PDU_RETRY_TIME_MS = 1 * 60 * 1000
+class InvalidResponseError(RuntimeError):
+ """Helper for _try_destination_list: indicates that the server returned a response
+ we couldn't parse
+ """
+ pass
+
+
class FederationClient(FederationBase):
def __init__(self, hs):
super(FederationClient, self).__init__(hs)
@@ -458,6 +465,61 @@ class FederationClient(FederationBase):
defer.returnValue(signed_auth)
@defer.inlineCallbacks
+ def _try_destination_list(self, description, destinations, callback):
+ """Try an operation on a series of servers, until it succeeds
+
+ Args:
+ description (unicode): description of the operation we're doing, for logging
+
+ destinations (Iterable[unicode]): list of server_names to try
+
+ callback (callable): Function to run for each server. Passed a single
+ argument: the server_name to try. May return a deferred.
+
+ If the callback raises a CodeMessageException with a 300/400 code,
+ attempts to perform the operation stop immediately and the exception is
+ reraised.
+
+ Otherwise, if the callback raises an Exception the error is logged and the
+ next server tried. Normally the stacktrace is logged but this is
+ suppressed if the exception is an InvalidResponseError.
+
+ Returns:
+ The [Deferred] result of callback, if it succeeds
+
+ Raises:
+ SynapseError if the chosen remote server returns a 300/400 code.
+
+ RuntimeError if no servers were reachable.
+ """
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ res = yield callback(destination)
+ defer.returnValue(res)
+ except InvalidResponseError as e:
+ logger.warn(
+ "Failed to %s via %s: %s",
+ description, destination, e,
+ )
+ except HttpResponseException as e:
+ if not 500 <= e.code < 600:
+ raise e.to_synapse_error()
+ else:
+ logger.warn(
+ "Failed to %s via %s: %i %s",
+ description, destination, e.code, e.message,
+ )
+ except Exception:
+ logger.warn(
+ "Failed to %s via %s",
+ description, destination, exc_info=1,
+ )
+
+ raise RuntimeError("Failed to %s via any server", description)
+
def make_membership_event(self, destinations, room_id, user_id, membership,
content={},):
"""
@@ -481,7 +543,7 @@ class FederationClient(FederationBase):
Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event.
- Fails with a ``CodeMessageException`` if the chosen remote server
+ Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
Fails with a ``RuntimeError`` if no servers were reachable.
@@ -492,50 +554,35 @@ class FederationClient(FederationBase):
"make_membership_event called with membership='%s', must be one of %s" %
(membership, ",".join(valid_memberships))
)
- for destination in destinations:
- if destination == self.server_name:
- continue
- try:
- ret = yield self.transport_layer.make_membership_event(
- destination, room_id, user_id, membership
- )
+ @defer.inlineCallbacks
+ def send_request(destination):
+ ret = yield self.transport_layer.make_membership_event(
+ destination, room_id, user_id, membership
+ )
- pdu_dict = ret["event"]
+ pdu_dict = ret["event"]
- logger.debug("Got response to make_%s: %s", membership, pdu_dict)
+ logger.debug("Got response to make_%s: %s", membership, pdu_dict)
- pdu_dict["content"].update(content)
+ pdu_dict["content"].update(content)
- # The protoevent received over the JSON wire may not have all
- # the required fields. Lets just gloss over that because
- # there's some we never care about
- if "prev_state" not in pdu_dict:
- pdu_dict["prev_state"] = []
+ # The protoevent received over the JSON wire may not have all
+ # the required fields. Lets just gloss over that because
+ # there's some we never care about
+ if "prev_state" not in pdu_dict:
+ pdu_dict["prev_state"] = []
- ev = builder.EventBuilder(pdu_dict)
+ ev = builder.EventBuilder(pdu_dict)
- defer.returnValue(
- (destination, ev)
- )
- break
- except CodeMessageException as e:
- if not 500 <= e.code < 600:
- raise
- else:
- logger.warn(
- "Failed to make_%s via %s: %s",
- membership, destination, e.message
- )
- except Exception as e:
- logger.warn(
- "Failed to make_%s via %s: %s",
- membership, destination, e.message
- )
+ defer.returnValue(
+ (destination, ev)
+ )
- raise RuntimeError("Failed to send to any server.")
+ return self._try_destination_list(
+ "make_" + membership, destinations, send_request,
+ )
- @defer.inlineCallbacks
def send_join(self, destinations, pdu):
"""Sends a join event to one of a list of homeservers.
@@ -552,103 +599,91 @@ class FederationClient(FederationBase):
giving the serer the event was sent to, ``state`` (?) and
``auth_chain``.
- Fails with a ``CodeMessageException`` if the chosen remote server
+ Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
Fails with a ``RuntimeError`` if no servers were reachable.
"""
- for destination in destinations:
- if destination == self.server_name:
- continue
-
- try:
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_join(
- destination=destination,
- room_id=pdu.room_id,
- event_id=pdu.event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ @defer.inlineCallbacks
+ def send_request(destination):
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_join(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
- logger.debug("Got content: %s", content)
+ logger.debug("Got content: %s", content)
- state = [
- event_from_pdu_json(p, outlier=True)
- for p in content.get("state", [])
- ]
+ state = [
+ event_from_pdu_json(p, outlier=True)
+ for p in content.get("state", [])
+ ]
- auth_chain = [
- event_from_pdu_json(p, outlier=True)
- for p in content.get("auth_chain", [])
- ]
+ auth_chain = [
+ event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
- pdus = {
- p.event_id: p
- for p in itertools.chain(state, auth_chain)
- }
+ pdus = {
+ p.event_id: p
+ for p in itertools.chain(state, auth_chain)
+ }
- valid_pdus = yield self._check_sigs_and_hash_and_fetch(
- destination, list(pdus.values()),
- outlier=True,
- )
+ valid_pdus = yield self._check_sigs_and_hash_and_fetch(
+ destination, list(pdus.values()),
+ outlier=True,
+ )
- valid_pdus_map = {
- p.event_id: p
- for p in valid_pdus
- }
-
- # NB: We *need* to copy to ensure that we don't have multiple
- # references being passed on, as that causes... issues.
- signed_state = [
- copy.copy(valid_pdus_map[p.event_id])
- for p in state
- if p.event_id in valid_pdus_map
- ]
+ valid_pdus_map = {
+ p.event_id: p
+ for p in valid_pdus
+ }
- signed_auth = [
- valid_pdus_map[p.event_id]
- for p in auth_chain
- if p.event_id in valid_pdus_map
- ]
+ # NB: We *need* to copy to ensure that we don't have multiple
+ # references being passed on, as that causes... issues.
+ signed_state = [
+ copy.copy(valid_pdus_map[p.event_id])
+ for p in state
+ if p.event_id in valid_pdus_map
+ ]
- # NB: We *need* to copy to ensure that we don't have multiple
- # references being passed on, as that causes... issues.
- for s in signed_state:
- s.internal_metadata = copy.deepcopy(s.internal_metadata)
+ signed_auth = [
+ valid_pdus_map[p.event_id]
+ for p in auth_chain
+ if p.event_id in valid_pdus_map
+ ]
- auth_chain.sort(key=lambda e: e.depth)
+ # NB: We *need* to copy to ensure that we don't have multiple
+ # references being passed on, as that causes... issues.
+ for s in signed_state:
+ s.internal_metadata = copy.deepcopy(s.internal_metadata)
- defer.returnValue({
- "state": signed_state,
- "auth_chain": signed_auth,
- "origin": destination,
- })
- except CodeMessageException as e:
- if not 500 <= e.code < 600:
- raise
- else:
- logger.exception(
- "Failed to send_join via %s: %s",
- destination, e.message
- )
- except Exception as e:
- logger.exception(
- "Failed to send_join via %s: %s",
- destination, e.message
- )
+ auth_chain.sort(key=lambda e: e.depth)
- raise RuntimeError("Failed to send to any server.")
+ defer.returnValue({
+ "state": signed_state,
+ "auth_chain": signed_auth,
+ "origin": destination,
+ })
+ return self._try_destination_list("send_join", destinations, send_request)
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
time_now = self._clock.time_msec()
- code, content = yield self.transport_layer.send_invite(
- destination=destination,
- room_id=room_id,
- event_id=event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ try:
+ code, content = yield self.transport_layer.send_invite(
+ destination=destination,
+ room_id=room_id,
+ event_id=event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+ except HttpResponseException as e:
+ if e.code == 403:
+ raise e.to_synapse_error()
+ raise
pdu_dict = content["event"]
@@ -663,7 +698,6 @@ class FederationClient(FederationBase):
defer.returnValue(pdu)
- @defer.inlineCallbacks
def send_leave(self, destinations, pdu):
"""Sends a leave event to one of a list of homeservers.
@@ -680,35 +714,25 @@ class FederationClient(FederationBase):
Return:
Deferred: resolves to None.
- Fails with a ``CodeMessageException`` if the chosen remote server
- returns a non-200 code.
+ Fails with a ``SynapseError`` if the chosen remote server
+ returns a 300/400 code.
Fails with a ``RuntimeError`` if no servers were reachable.
"""
- for destination in destinations:
- if destination == self.server_name:
- continue
-
- try:
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_leave(
- destination=destination,
- room_id=pdu.room_id,
- event_id=pdu.event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ @defer.inlineCallbacks
+ def send_request(destination):
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_leave(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
- logger.debug("Got content: %s", content)
- defer.returnValue(None)
- except CodeMessageException:
- raise
- except Exception as e:
- logger.exception(
- "Failed to send_leave via %s: %s",
- destination, e.message
- )
+ logger.debug("Got content: %s", content)
+ defer.returnValue(None)
- raise RuntimeError("Failed to send to any server.")
+ return self._try_destination_list("send_leave", destinations, send_request)
def get_public_rooms(self, destination, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 657935d1ac..bf89d568af 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -426,6 +426,7 @@ class FederationServer(FederationBase):
ret = yield self.handler.on_query_auth(
origin,
event_id,
+ room_id,
signed_auth,
content.get("rejects", []),
content.get("missing", []),
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index c3f2d7feff..f772e62c28 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,10 +19,12 @@ import random
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID
from synapse.util.logutils import log_function
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -129,11 +131,13 @@ class EventStreamHandler(BaseHandler):
class EventHandler(BaseHandler):
@defer.inlineCallbacks
- def get_event(self, user, event_id):
+ def get_event(self, user, room_id, event_id):
"""Retrieve a single specified event.
Args:
user (synapse.types.UserID): The user requesting the event
+ room_id (str|None): The expected room id. We'll return None if the
+ event's room does not match.
event_id (str): The event ID to obtain.
Returns:
dict: An event, or None if there is no event matching this ID.
@@ -142,13 +146,26 @@ class EventHandler(BaseHandler):
AuthError if the user does not have the rights to inspect this
event.
"""
- event = yield self.store.get_event(event_id)
+ event = yield self.store.get_event(event_id, check_room_id=room_id)
if not event:
defer.returnValue(None)
return
- if hasattr(event, "room_id"):
- yield self.auth.check_joined_room(event.room_id, user.to_string())
+ users = yield self.store.get_users_in_room(event.room_id)
+ is_peeking = user.to_string() not in users
+
+ filtered = yield filter_events_for_client(
+ self.store,
+ user.to_string(),
+ [event],
+ is_peeking=is_peeking
+ )
+
+ if not filtered:
+ raise AuthError(
+ 403,
+ "You don't have permission to access that event."
+ )
defer.returnValue(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 91d8def08b..533b82c783 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -400,7 +400,7 @@ class FederationHandler(BaseHandler):
)
try:
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
except AuthError as e:
@@ -444,7 +444,7 @@ class FederationHandler(BaseHandler):
yield self._handle_new_events(origin, event_infos)
try:
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin,
event,
state=state,
@@ -469,17 +469,6 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
@@ -501,7 +490,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
+ yield self.user_joined_room(user, event.room_id)
@log_function
@defer.inlineCallbacks
@@ -942,7 +931,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = []
- yield self.store.clean_room_for_join(room_id)
+ yield self._clean_room_for_join(room_id)
handled_events = set()
@@ -981,15 +970,10 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
-
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -1084,7 +1068,7 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin, event
)
@@ -1094,20 +1078,10 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
+ yield self.user_joined_room(user, event.room_id)
prev_state_ids = yield context.get_prev_state_ids(self.store)
@@ -1176,17 +1150,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self._persist_events([(event, context)])
defer.returnValue(event)
@@ -1217,17 +1181,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self._persist_events([(event, context)])
defer.returnValue(event)
@@ -1318,7 +1272,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ yield self._handle_new_event(
origin, event
)
@@ -1328,22 +1282,17 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
defer.returnValue(None)
@defer.inlineCallbacks
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
+
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id,
+ )
+
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
@@ -1354,8 +1303,7 @@ class FederationHandler(BaseHandler):
(e.type, e.state_key): e for e in state
}
- event = yield self.store.get_event(event_id)
- if event and event.is_state():
+ if event.is_state():
# Get previous state
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
@@ -1374,6 +1322,10 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id,
+ )
+
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)
@@ -1382,8 +1334,7 @@ class FederationHandler(BaseHandler):
_, state = state_groups.items().pop()
results = state
- event = yield self.store.get_event(event_id)
- if event and event.is_state():
+ if event.is_state():
# Get previous state
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
@@ -1472,9 +1423,8 @@ class FederationHandler(BaseHandler):
event, context
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
+ yield self._persist_events(
+ [(event, context)],
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
@@ -1487,15 +1437,7 @@ class FederationHandler(BaseHandler):
six.reraise(tp, value, tb)
- if not backfilled:
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- logcontext.run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id,
- )
-
- defer.returnValue((context, event_stream_id, max_stream_id))
+ defer.returnValue(context)
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
@@ -1503,6 +1445,8 @@ class FederationHandler(BaseHandler):
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
+
+ Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@@ -1517,7 +1461,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self.store.persist_events(
+ yield self._persist_events(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1529,7 +1473,8 @@ class FederationHandler(BaseHandler):
def _persist_auth_tree(self, origin, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
- Persists the event seperately.
+ Persists the event separately. Notifies about the persisted events
+ where appropriate.
Will attempt to fetch missing auth events.
@@ -1540,8 +1485,7 @@ class FederationHandler(BaseHandler):
event (Event)
Returns:
- 2-tuple of (event_stream_id, max_stream_id) from the persist_event
- call for `event`
+ Deferred
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
@@ -1605,7 +1549,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self.store.persist_events(
+ yield self._persist_events(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1616,12 +1560,10 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event, new_event_context,
+ yield self._persist_events(
+ [(event, new_event_context)],
)
- defer.returnValue((event_stream_id, max_stream_id))
-
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
"""
@@ -1678,8 +1620,19 @@ class FederationHandler(BaseHandler):
defer.returnValue(context)
@defer.inlineCallbacks
- def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
+ def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
missing):
+ in_room = yield self.auth.check_host_in_room(
+ room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id
+ )
+
# Just go through and process each event in `remote_auth_chain`. We
# don't want to fall into the trap of `missing` being wrong.
for e in remote_auth_chain:
@@ -1689,7 +1642,6 @@ class FederationHandler(BaseHandler):
pass
# Now get the current auth_chain for the event.
- event = yield self.store.get_event(event_id)
local_auth_chain = yield self.store.get_auth_chain(
[auth_id for auth_id, _ in event.auth_events],
include_given=True
@@ -2347,3 +2299,69 @@ class FederationHandler(BaseHandler):
)
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")
+
+ @defer.inlineCallbacks
+ def _persist_events(self, event_and_contexts, backfilled=False):
+ """Persists events and tells the notifier/pushers about them, if
+ necessary.
+
+ Args:
+ event_and_contexts(list[tuple[FrozenEvent, EventContext]])
+ backfilled (bool): Whether these events are a result of
+ backfilling or not
+
+ Returns:
+ Deferred
+ """
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
+
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
+
+ def _notify_persisted_event(self, event, max_stream_id):
+ """Checks to see if notifier/pushers should be notified about the
+ event or not.
+
+ Args:
+ event (FrozenEvent)
+ max_stream_id (int): The max_stream_id returned by persist_events
+ """
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+
+ # We notify for memberships if its an invite for one of our
+ # users
+ if event.internal_metadata.is_outlier():
+ if event.membership != Membership.INVITE:
+ if not self.is_mine_id(target_user_id):
+ return
+
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+ elif event.internal_metadata.is_outlier():
+ return
+
+ event_stream_id = event.internal_metadata.stream_ordering
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ logcontext.run_in_background(
+ self.pusher_pool.on_new_notifications,
+ event_stream_id, max_stream_id,
+ )
+
+ def _clean_room_for_join(self, room_id):
+ return self.store.clean_room_for_join(room_id)
+
+ def user_joined_room(self, user, room_id):
+ """Called when a new user has joined the room
+ """
+ return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 8c8aedb2b8..1d36d967c3 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.errors import (
CodeMessageException,
Codes,
- MatrixCodeMessageException,
+ HttpResponseException,
SynapseError,
)
@@ -85,7 +85,6 @@ class IdentityHandler(BaseHandler):
)
defer.returnValue(None)
- data = {}
try:
data = yield self.http_client.get_json(
"https://%s%s" % (
@@ -94,11 +93,9 @@ class IdentityHandler(BaseHandler):
),
{'sid': creds['sid'], 'client_secret': client_secret}
)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
logger.info("getValidated3pid failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
- data = json.loads(e.msg)
+ raise e.to_synapse_error()
if 'medium' in data:
defer.returnValue(data)
@@ -136,7 +133,7 @@ class IdentityHandler(BaseHandler):
)
logger.debug("bound threepid %r to %s", creds, mxid)
except CodeMessageException as e:
- data = json.loads(e.msg)
+ data = json.loads(e.msg) # XXX WAT?
defer.returnValue(data)
@defer.inlineCallbacks
@@ -209,12 +206,9 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
- except MatrixCodeMessageException as e:
- logger.info("Proxied requestToken failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
+ except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
- raise e
+ raise e.to_synapse_error()
@defer.inlineCallbacks
def requestMsisdnToken(
@@ -244,9 +238,6 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
- except MatrixCodeMessageException as e:
- logger.info("Proxied requestToken failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
+ except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
- raise e
+ raise e.to_synapse_error()
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 25b6307884..3771e0b3f6 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -39,12 +39,7 @@ from twisted.web.client import (
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
-from synapse.api.errors import (
- CodeMessageException,
- Codes,
- MatrixCodeMessageException,
- SynapseError,
-)
+from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async import add_timeout_to_deferred
@@ -132,6 +127,11 @@ class SimpleHttpClient(object):
Returns:
Deferred[object]: parsed json
+
+ Raises:
+ HttpResponseException: On a non-2xx HTTP response.
+
+ ValueError: if the response was not JSON
"""
# TODO: Do we ever want to log message contents?
@@ -155,7 +155,10 @@ class SimpleHttpClient(object):
body = yield make_deferred_yieldable(readBody(response))
- defer.returnValue(json.loads(body))
+ if 200 <= response.code < 300:
+ defer.returnValue(json.loads(body))
+ else:
+ raise HttpResponseException(response.code, response.phrase, body)
@defer.inlineCallbacks
def post_json_get_json(self, uri, post_json, headers=None):
@@ -169,6 +172,11 @@ class SimpleHttpClient(object):
Returns:
Deferred[object]: parsed json
+
+ Raises:
+ HttpResponseException: On a non-2xx HTTP response.
+
+ ValueError: if the response was not JSON
"""
json_str = encode_canonical_json(post_json)
@@ -193,9 +201,7 @@ class SimpleHttpClient(object):
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
else:
- raise self._exceptionFromFailedRequest(response, body)
-
- defer.returnValue(json.loads(body))
+ raise HttpResponseException(response.code, response.phrase, body)
@defer.inlineCallbacks
def get_json(self, uri, args={}, headers=None):
@@ -213,14 +219,12 @@ class SimpleHttpClient(object):
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
HTTP body as JSON.
Raises:
- On a non-2xx HTTP response. The response body will be used as the
- error message.
+ HttpResponseException On a non-2xx HTTP response.
+
+ ValueError: if the response was not JSON
"""
- try:
- body = yield self.get_raw(uri, args, headers=headers)
- defer.returnValue(json.loads(body))
- except CodeMessageException as e:
- raise self._exceptionFromFailedRequest(e.code, e.msg)
+ body = yield self.get_raw(uri, args, headers=headers)
+ defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def put_json(self, uri, json_body, args={}, headers=None):
@@ -239,7 +243,9 @@ class SimpleHttpClient(object):
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
HTTP body as JSON.
Raises:
- On a non-2xx HTTP response.
+ HttpResponseException On a non-2xx HTTP response.
+
+ ValueError: if the response was not JSON
"""
if len(args):
query_bytes = urllib.urlencode(args, True)
@@ -266,10 +272,7 @@ class SimpleHttpClient(object):
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
else:
- # NB: This is explicitly not json.loads(body)'d because the contract
- # of CodeMessageException is a *string* message. Callers can always
- # load it into JSON if they want.
- raise CodeMessageException(response.code, body)
+ raise HttpResponseException(response.code, response.phrase, body)
@defer.inlineCallbacks
def get_raw(self, uri, args={}, headers=None):
@@ -287,8 +290,7 @@ class SimpleHttpClient(object):
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
HTTP body at text.
Raises:
- On a non-2xx HTTP response. The response body will be used as the
- error message.
+ HttpResponseException on a non-2xx HTTP response.
"""
if len(args):
query_bytes = urllib.urlencode(args, True)
@@ -311,16 +313,7 @@ class SimpleHttpClient(object):
if 200 <= response.code < 300:
defer.returnValue(body)
else:
- raise CodeMessageException(response.code, body)
-
- def _exceptionFromFailedRequest(self, response, body):
- try:
- jsonBody = json.loads(body)
- errcode = jsonBody['errcode']
- error = jsonBody['error']
- return MatrixCodeMessageException(response.code, error, errcode)
- except (ValueError, KeyError):
- return CodeMessageException(response.code, body)
+ raise HttpResponseException(response.code, response.phrase, body)
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 1940c1c4f4..6dacb31037 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -36,7 +36,6 @@ from synapse.api.errors import (
Codes,
SynapseError,
UnrecognizedRequestError,
- cs_exception,
)
from synapse.http.request_metrics import requests_counter
from synapse.util.caches import intern_dict
@@ -77,16 +76,13 @@ def wrap_json_request_handler(h):
def wrapped_request_handler(self, request):
try:
yield h(self, request)
- except CodeMessageException as e:
+ except SynapseError as e:
code = e.code
- if isinstance(e, SynapseError):
- logger.info(
- "%s SynapseError: %s - %s", request, code, e.msg
- )
- else:
- logger.exception(e)
+ logger.info(
+ "%s SynapseError: %s - %s", request, code, e.msg
+ )
respond_with_json(
- request, code, cs_exception(e), send_cors=True,
+ request, code, e.error_dict(), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 6bfc8a5b89..7a3cfb159c 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -18,7 +18,7 @@ import re
from twisted.internet import defer
-from synapse.api.errors import MatrixCodeMessageException, SynapseError
+from synapse.api.errors import HttpResponseException
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
@@ -56,11 +56,11 @@ def remote_join(client, host, port, requester, remote_room_hosts,
try:
result = yield client.post_json_get_json(uri, payload)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
- raise SynapseError(e.code, e.msg, e.errcode)
+ raise e.to_synapse_error()
defer.returnValue(result)
@@ -92,11 +92,11 @@ def remote_reject_invite(client, host, port, requester, remote_room_hosts,
try:
result = yield client.post_json_get_json(uri, payload)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
- raise SynapseError(e.code, e.msg, e.errcode)
+ raise e.to_synapse_error()
defer.returnValue(result)
@@ -131,11 +131,11 @@ def get_or_register_3pid_guest(client, host, port, requester,
try:
result = yield client.post_json_get_json(uri, payload)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
- raise SynapseError(e.code, e.msg, e.errcode)
+ raise e.to_synapse_error()
defer.returnValue(result)
@@ -165,11 +165,11 @@ def notify_user_membership_change(client, host, port, user_id, room_id, change):
try:
result = yield client.post_json_get_json(uri, payload)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
- raise SynapseError(e.code, e.msg, e.errcode)
+ raise e.to_synapse_error()
defer.returnValue(result)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 5227bc333d..d3509dc288 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -18,11 +18,7 @@ import re
from twisted.internet import defer
-from synapse.api.errors import (
- CodeMessageException,
- MatrixCodeMessageException,
- SynapseError,
-)
+from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
@@ -83,11 +79,11 @@ def send_event_to_master(clock, store, client, host, port, requester, event, con
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
- raise SynapseError(e.code, e.msg, e.errcode)
+ raise e.to_synapse_error()
defer.returnValue(result)
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index b70c9c2806..0f3a2e8b51 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -88,7 +88,7 @@ class EventRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, event_id):
requester = yield self.auth.get_user_by_req(request)
- event = yield self.event_handler.get_event(requester.user, event_id)
+ event = yield self.event_handler.get_event(requester.user, None, event_id)
time_now = self.clock.time_msec()
if event:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 13c331550b..fa5989e74e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -506,7 +506,7 @@ class RoomEventServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
requester = yield self.auth.get_user_by_req(request)
- event = yield self.event_handler.get_event(requester.user, event_id)
+ event = yield self.event_handler.get_event(requester.user, room_id, event_id)
time_now = self.clock.time_msec()
if event:
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 174ad20123..8fb413d825 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -379,7 +379,7 @@ class MediaRepository(object):
logger.warn("HTTP error fetching remote media %s/%s: %s",
server_name, media_id, e.response)
if e.code == twisted.web.http.NOT_FOUND:
- raise SynapseError.from_http_response_exception(e)
+ raise e.to_synapse_error()
raise SynapseError(502, "Failed to fetch remote media")
except SynapseError:
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8bd35df119..24345b20a6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -343,6 +343,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
table="events",
keyvalues={
"event_id": event_id,
+ "room_id": room_id,
},
retcol="depth",
allow_none=True,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 61223da1a5..e8e5a0fe44 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -241,12 +241,18 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
self._state_resolution_handler = hs.get_state_resolution_handler()
+ @defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
- backfilled: ?
+ backfilled (bool): Whether the results are retrieved from federation
+ via backfill or not. Used to determine if they're "new" events
+ which might update the current state etc.
+
+ Returns:
+ Deferred[int]: the stream ordering of the latest persisted event
"""
partitioned = {}
for event, ctx in events_and_contexts:
@@ -263,10 +269,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
for room_id in partitioned:
self._maybe_start_persisting(room_id)
- return make_deferred_yieldable(
+ yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
+
+ defer.returnValue(max_persisted_id)
+
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index f28239a808..9b4cfeb899 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -19,7 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
-from synapse.api.errors import SynapseError
+from synapse.api.errors import NotFoundError
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
@@ -77,7 +77,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
- allow_none=False):
+ allow_none=False, check_room_id=None):
"""Get an event from the database by event_id.
Args:
@@ -88,7 +88,9 @@ class EventsWorkerStore(SQLBaseStore):
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
allow_none (bool): If True, return None if no event found, if
- False throw an exception.
+ False throw a NotFoundError
+ check_room_id (str|None): if not None, check the room of the found event.
+ If there is a mismatch, behave as per allow_none.
Returns:
Deferred : A FrozenEvent.
@@ -100,10 +102,16 @@ class EventsWorkerStore(SQLBaseStore):
allow_rejected=allow_rejected,
)
- if not events and not allow_none:
- raise SynapseError(404, "Could not find event %s" % (event_id,))
+ event = events[0] if events else None
- defer.returnValue(events[0] if events else None)
+ if event is not None and check_room_id is not None:
+ if event.room_id != check_room_id:
+ event = None
+
+ if event is None and not allow_none:
+ raise NotFoundError("Could not find event %s" % (event_id,))
+
+ defer.returnValue(event)
@defer.inlineCallbacks
def get_events(self, event_ids, check_redacted=True,
|