diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..e484061cc0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,269 +14,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import simplejson
import sys
-from canonicaljson import encode_canonical_json
import six
-from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
-from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import (
- AuthError, Codes, SynapseError,
+ AuthError,
+ Codes,
ConsentNotGivenError,
+ NotFoundError,
+ SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.types import (
- UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.types import RoomAlias, UserID
+from synapse.util.async_helpers import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
-from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class PurgeStatus(object):
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
-
- Attributes:
- status (int): Tracks whether this request has completed. One of
- STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+ """Contains some read only APIs to get state about a room
"""
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- def __init__(self):
- self.status = PurgeStatus.STATUS_ACTIVE
-
- def asdict(self):
- return {
- "status": PurgeStatus.STATUS_TEXT[self.status]
- }
-
-
-class MessageHandler(BaseHandler):
-
def __init__(self, hs):
- super(MessageHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
self.clock = hs.get_clock()
-
- self.pagination_lock = ReadWriteLock()
- self._purges_in_progress_by_room = set()
- # map from purge id to PurgeStatus
- self._purges_by_id = {}
-
- def start_purge_history(self, room_id, token,
- delete_local_events=False):
- """Start off a history purge on a room.
-
- Args:
- room_id (str): The room to purge from
-
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- str: unique ID for this purge transaction.
- """
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400,
- "History purge already in progress for %s" % (room_id, ),
- )
-
- purge_id = random_string(16)
-
- # we log the purge_id here so that it can be tied back to the
- # request id in the log lines.
- logger.info("[purge] starting purge_id %s", purge_id)
-
- self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history,
- purge_id, room_id, token, delete_local_events,
- )
- return purge_id
-
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token,
- delete_local_events):
- """Carry out a history purge on a room.
-
- Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- Deferred
- """
- self._purges_in_progress_by_room.add(room_id)
- try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, token, delete_local_events,
- )
- logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
- except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge():
- del self._purges_by_id[purge_id]
- reactor.callLater(24 * 3600, clear_purge)
-
- def get_purge_status(self, purge_id):
- """Get the current status of an active purge
-
- Args:
- purge_id (str): purge_id returned by start_purge_history
-
- Returns:
- PurgeStatus|None
- """
- return self._purges_by_id.get(purge_id)
-
- @defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
- """Get messages in a room.
-
- Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
- Returns:
- dict: Pagination API results
- """
- user_id = requester.user.to_string()
-
- if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
- else:
- pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token_for_room(
- room_id=room_id
- )
- )
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
-
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
- )
-
- source_config = pagin_config.get_source_config("room")
-
- with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
- )
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
-
- events, next_key = yield self.store.paginate_room_events(
- room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
- event_filter=event_filter,
- )
-
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
-
- if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
-
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
-
- time_now = self.clock.time_msec()
-
- chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- }
-
- defer.returnValue(chunk)
+ self.state = hs.get_state_handler()
+ self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -290,12 +71,12 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
+ data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
@@ -308,53 +89,85 @@ class MessageHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
- )
-
- @defer.inlineCallbacks
- def get_state_events(self, user_id, room_id, is_guest=False):
+ def get_state_events(
+ self, user_id, room_id, types=None, filtered_types=None,
+ at_token=None, is_guest=False,
+ ):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
- left the room return the state events from when they left.
+ left the room return the state events from when they left. If an explicit
+ 'at' parameter is passed, return the state events as of that event, if
+ visible.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
+ at_token(StreamToken|None): the stream token of the at which we are requesting
+ the stats. If the user is not allowed to view the state as of that
+ stream token, we raise a 403 SynapseError. If None, returns the current
+ state based on the current_state_events table.
+ is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
+ Raises:
+ NotFoundError (404) if the at token does not yield an event
+
+ AuthError (403) if the user doesn't have permission to view
+ members of this room.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
+ if at_token:
+ # FIXME this claims to get the state at a stream position, but
+ # get_recent_events_for_room operates by topo ordering. This therefore
+ # does not reliably give you the state at the given stream position.
+ # (https://github.com/matrix-org/synapse/issues/3305)
+ last_events, _ = yield self.store.get_recent_events_for_room(
+ room_id, end_token=at_token.room_key, limit=1,
+ )
- if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
- elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], None
+ if not last_events:
+ raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+ visible_events = yield filter_events_for_client(
+ self.store, user_id, last_events,
)
- room_state = room_state[membership_event_id]
+
+ event = last_events[0]
+ if visible_events:
+ room_state = yield self.store.get_state_for_events(
+ [event.event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[event.event_id]
+ else:
+ raise AuthError(
+ 403,
+ "User %s not allowed to view events in room %s at token %s" % (
+ user_id, room_id, at_token,
+ )
+ )
+ else:
+ membership, membership_event_id = (
+ yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id,
+ )
+ )
+
+ if membership == Membership.JOIN:
+ state_ids = yield self.store.get_filtered_current_state_ids(
+ room_id, types, filtered_types=filtered_types,
+ )
+ room_state = yield self.store.get_events(state_ids.values())
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
@@ -377,7 +190,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
- membership, _ = yield self._check_in_room_or_world_readable(
+ membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
@@ -388,7 +201,7 @@ class MessageHandler(BaseHandler):
users_with_profile = yield self.state.get_current_user_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
- # This can either be because the AS user is in the room or becuase there
+ # This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
@@ -422,7 +235,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
- self.http_client = hs.get_simple_http_client()
+ self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -431,7 +244,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@@ -463,10 +276,14 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
-
+ Raises:
+ ResourceLimitError if server is blocked to some resource being
+ exceeded
Returns:
Tuple of created event (FrozenEvent), Context
"""
+ yield self.auth.check_auth_blocking(requester.user.to_string())
+
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
@@ -491,7 +308,7 @@ class EventCreationHandler(object):
target, e
)
- is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+ is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
@@ -509,12 +326,13 @@ class EventCreationHandler(object):
defer.returnValue((event, context))
- def _is_exempt_from_privacy_policy(self, builder):
+ def _is_exempt_from_privacy_policy(self, builder, requester):
""""Determine if an event to be sent is exempt from having to consent
to the privacy policy
Args:
builder (synapse.events.builder.EventBuilder): event being created
+ requester (Requster): user requesting this event
Returns:
Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +343,9 @@ class EventCreationHandler(object):
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
return self._is_server_notices_room(builder.room_id)
+ elif membership == Membership.LEAVE:
+ # the user is always allowed to leave (but not kick people)
+ return builder.state_key == requester.user.to_string()
return succeed(False)
@defer.inlineCallbacks
@@ -630,7 +451,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -752,8 +574,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -793,7 +615,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
- simplejson.loads(dump)
+ json.loads(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@@ -805,10 +627,9 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
- yield send_event_to_master(
- self.http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ yield self.send_event_to_master(
+ event_id=event.event_id,
+ store=self.store,
requester=requester,
event=event,
context=context,
@@ -883,9 +704,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in iteritems(context.current_state_ids)
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -921,8 +744,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -942,26 +766,23 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id
+ self.pusher_pool.on_new_notifications(
+ event_stream_id, max_stream_id,
)
- @defer.inlineCallbacks
def _notify():
- yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
|