diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f12465fa2c..14051aee99 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,13 +16,13 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError, SynapseError
+from synapse.api.errors import SynapseError, AuthError, Codes
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID, RoomStreamToken
+from synapse.types import UserID, RoomStreamToken, StreamToken
from ._base import BaseHandler
@@ -71,34 +71,64 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
- feedback=False, as_client_event=True):
+ as_client_event=True, is_guest=False):
"""Get messages in a room.
Args:
user_id (str): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- feedback (bool): True to get compressed feedback with the messages
+ config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
+ is_guest (bool): Whether the requesting user is a guest (as opposed
+ to a fully registered user).
Returns:
dict: Pagination API results
"""
- yield self.auth.check_joined_room(room_id, user_id)
-
data_source = self.hs.get_event_sources().sources["room"]
- if not pagin_config.from_token:
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token(
direction='b'
)
)
+ room_token = pagin_config.from_token.room_key
- room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+ room_token = RoomStreamToken.parse(room_token)
if room_token.topological is None:
raise SynapseError(400, "Invalid token")
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ if not is_guest:
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+ if member_event.membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room.
+ # If they're a guest, we'll just 403 them if they're asking for
+ # events they can't see.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event.event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < room_token.topological:
+ source_config.from_key = str(leave_token)
+
+ if source_config.direction == "f":
+ if source_config.to_key is None:
+ source_config.to_key = str(leave_token)
+ else:
+ to_token = RoomStreamToken.parse(source_config.to_key)
+ if leave_token.topological < to_token.topological:
+ source_config.to_key = str(leave_token)
+
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, room_token.topological
)
@@ -106,7 +136,7 @@ class MessageHandler(BaseHandler):
user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
- user, pagin_config.get_source_config("room"), room_id
+ user, source_config, room_id
)
next_token = pagin_config.from_token.copy_and_replace(
@@ -120,7 +150,7 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
- events = yield self._filter_events_for_client(user_id, room_id, events)
+ events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest)
time_now = self.clock.time_msec()
@@ -136,54 +166,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, room_id, events):
- event_id_to_state = yield self.store.get_state_for_events(
- room_id, frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_id),
- )
- )
-
- def allowed(event, state):
- if event.type == EventTypes.RoomHistoryVisibility:
- return True
-
- membership_ev = state.get((EventTypes.Member, user_id), None)
- if membership_ev:
- membership = membership_ev.membership
- else:
- membership = Membership.LEAVE
-
- if membership == Membership.JOIN:
- return True
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- else:
- visibility = "shared"
-
- if visibility == "public":
- return True
- elif visibility == "shared":
- return True
- elif visibility == "joined":
- return membership == Membership.JOIN
- elif visibility == "invited":
- return membership == Membership.INVITE
-
- return True
-
- defer.returnValue([
- event
- for event in events
- if allowed(event, event_id_to_state[event.event_id])
- ])
-
- @defer.inlineCallbacks
def create_and_send_event(self, event_dict, ratelimit=True,
- client=None, txn_id=None):
+ token_id=None, txn_id=None, is_guest=False):
""" Given a dict from a client, create and handle a new event.
Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -217,11 +201,8 @@ class MessageHandler(BaseHandler):
builder.content
)
- if client is not None:
- if client.token_id is not None:
- builder.internal_metadata.token_id = client.token_id
- if client.device_id is not None:
- builder.internal_metadata.device_id = client.device_id
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
@@ -232,7 +213,7 @@ class MessageHandler(BaseHandler):
if event.type == EventTypes.Member:
member_handler = self.hs.get_handlers().room_member_handler
- yield member_handler.change_membership(event, context)
+ yield member_handler.change_membership(event, context, is_guest=is_guest)
else:
yield self.handle_new_client_event(
event=event,
@@ -248,7 +229,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
- event_type=None, state_key=""):
+ event_type=None, state_key="", is_guest=False):
""" Get data from a room.
Args:
@@ -258,29 +239,55 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- have_joined = yield self.auth.check_joined_room(room_id, user_id)
- if not have_joined:
- raise RoomError(403, "User not in room.")
-
- data = yield self.state_handler.get_current_state(
- room_id, event_type, state_key
+ membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id, is_guest
)
- defer.returnValue(data)
- @defer.inlineCallbacks
- def get_feedback(self, event_id):
- # yield self.auth.check_joined_room(room_id, user_id)
+ if membership == Membership.JOIN:
+ data = yield self.state_handler.get_current_state(
+ room_id, event_type, state_key
+ )
+ elif membership == Membership.LEAVE:
+ key = (event_type, state_key)
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], [key]
+ )
+ data = room_state[membership_event_id].get(key)
- # Pull out the feedback from the db
- fb = yield self.store.get_feedback(event_id)
+ defer.returnValue(data)
- if fb:
- defer.returnValue(fb)
- defer.returnValue(None)
+ @defer.inlineCallbacks
+ def _check_in_room_or_world_readable(self, room_id, user_id, is_guest):
+ try:
+ # check_user_was_in_room will return the most recent membership
+ # event for the user if:
+ # * The user is a non-guest user, and was ever in the room
+ # * The user is a guest user, and has joined the room
+ # else it will throw.
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+ defer.returnValue((member_event.membership, member_event.event_id))
+ return
+ except AuthError, auth_error:
+ visibility = yield self.state_handler.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
+ )
+ if (
+ visibility and
+ visibility.content["history_visibility"] == "world_readable"
+ ):
+ defer.returnValue((Membership.JOIN, None))
+ return
+ if not is_guest:
+ raise auth_error
+ raise AuthError(
+ 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ )
@defer.inlineCallbacks
- def get_state_events(self, user_id, room_id):
- """Retrieve all state events for a given room.
+ def get_state_events(self, user_id, room_id, is_guest=False):
+ """Retrieve all state events for a given room. If the user is
+ joined to the room then return the current state. If the user has
+ left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
@@ -288,18 +295,26 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
- yield self.auth.check_joined_room(room_id, user_id)
+ membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id, is_guest
+ )
+
+ if membership == Membership.JOIN:
+ room_state = yield self.state_handler.get_current_state(room_id)
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], None
+ )
+ room_state = room_state[membership_event_id]
- # TODO: This is duplicating logic from snapshot_all_rooms
- current_state = yield self.state_handler.get_current_state(room_id)
now = self.clock.time_msec()
defer.returnValue(
- [serialize_event(c, now) for c in current_state.values()]
+ [serialize_event(c, now) for c in room_state.values()]
)
@defer.inlineCallbacks
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
- feedback=False, as_client_event=True):
+ as_client_event=True, include_archived=False):
"""Retrieve a snapshot of all rooms the user is invited or has joined.
This snapshot may include messages for all rooms where the user is
@@ -309,17 +324,20 @@ class MessageHandler(BaseHandler):
user_id (str): The ID of the user making the request.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config used to determine how many messages *PER ROOM* to return.
- feedback (bool): True to get feedback along with these messages.
as_client_event (bool): True to get events in client-server format.
+ include_archived (bool): True to get rooms that the user has left
Returns:
A list of dicts with "room_id" and "membership" keys for all rooms
the user is currently invited or joined in on. Rooms where the user
is joined on, may return a "messages" key with messages, depending
on the specified PaginationConfig.
"""
+ memberships = [Membership.INVITE, Membership.JOIN]
+ if include_archived:
+ memberships.append(Membership.LEAVE)
+
room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=user_id,
- membership_list=[Membership.INVITE, Membership.JOIN]
+ user_id=user_id, membership_list=memberships
)
user = UserID.from_string(user_id)
@@ -339,6 +357,8 @@ class MessageHandler(BaseHandler):
user, pagination_config.get_source_config("receipt"), None
)
+ tags_by_room = yield self.store.get_tags_for_user(user_id)
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -357,28 +377,45 @@ class MessageHandler(BaseHandler):
}
if event.membership == Membership.INVITE:
+ time_now = self.clock.time_msec()
d["inviter"] = event.sender
+ invite_event = yield self.store.get_event(event.event_id)
+ d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+
rooms_ret.append(d)
- if event.membership != Membership.JOIN:
+ if event.membership not in (Membership.JOIN, Membership.LEAVE):
return
+
try:
+ if event.membership == Membership.JOIN:
+ room_end_token = now_token.room_key
+ deferred_room_state = self.state_handler.get_current_state(
+ event.room_id
+ )
+ elif event.membership == Membership.LEAVE:
+ room_end_token = "s%d" % (event.stream_ordering,)
+ deferred_room_state = self.store.get_state_for_events(
+ [event.event_id], None
+ )
+ deferred_room_state.addCallback(
+ lambda states: states[event.event_id]
+ )
+
(messages, token), current_state = yield defer.gatherResults(
[
self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
- end_token=now_token.room_key,
- ),
- self.state_handler.get_current_state(
- event.room_id
+ end_token=room_end_token,
),
+ deferred_room_state,
]
).addErrback(unwrapFirstError)
messages = yield self._filter_events_for_client(
- user_id, event.room_id, messages
+ user_id, messages
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -398,6 +435,15 @@ class MessageHandler(BaseHandler):
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
]
+
+ private_user_data = []
+ tags = tags_by_room.get(event.room_id)
+ if tags:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ d["private_user_data"] = private_user_data
except:
logger.exception("Failed to get snapshot")
@@ -420,15 +466,99 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
@defer.inlineCallbacks
- def room_initial_sync(self, user_id, room_id, pagin_config=None,
- feedback=False):
- current_state = yield self.state.get_current_state(
- room_id=room_id,
+ def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False):
+ """Capture the a snapshot of a room. If user is currently a member of
+ the room this will be what is currently in the room. If the user left
+ the room this will be what was in the room when they left.
+
+ Args:
+ user_id(str): The user to get a snapshot for.
+ room_id(str): The room to get a snapshot of.
+ pagin_config(synapse.streams.config.PaginationConfig):
+ The pagination config used to determine how many messages to
+ return.
+ Raises:
+ AuthError if the user wasn't in the room.
+ Returns:
+ A JSON serialisable dict with the snapshot of the room.
+ """
+
+ membership, member_event_id = yield self._check_in_room_or_world_readable(
+ room_id,
+ user_id,
+ is_guest
)
- yield self.auth.check_joined_room(
- room_id, user_id,
- current_state=current_state
+ if membership == Membership.JOIN:
+ result = yield self._room_initial_sync_joined(
+ user_id, room_id, pagin_config, membership, is_guest
+ )
+ elif membership == Membership.LEAVE:
+ result = yield self._room_initial_sync_parted(
+ user_id, room_id, pagin_config, membership, member_event_id, is_guest
+ )
+
+ private_user_data = []
+ tags = yield self.store.get_tags_for_room(user_id, room_id)
+ if tags:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ result["private_user_data"] = private_user_data
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
+ membership, member_event_id, is_guest):
+ room_state = yield self.store.get_state_for_events(
+ [member_event_id], None
+ )
+
+ room_state = room_state[member_event_id]
+
+ limit = pagin_config.limit if pagin_config else None
+ if limit is None:
+ limit = 10
+
+ stream_token = yield self.store.get_stream_token_for_event(
+ member_event_id
+ )
+
+ messages, token = yield self.store.get_recent_events_for_room(
+ room_id,
+ limit=limit,
+ end_token=stream_token
+ )
+
+ messages = yield self._filter_events_for_client(
+ user_id, messages, is_guest=is_guest
+ )
+
+ start_token = StreamToken(token[0], 0, 0, 0, 0)
+ end_token = StreamToken(token[1], 0, 0, 0, 0)
+
+ time_now = self.clock.time_msec()
+
+ defer.returnValue({
+ "membership": membership,
+ "room_id": room_id,
+ "messages": {
+ "chunk": [serialize_event(m, time_now) for m in messages],
+ "start": start_token.to_string(),
+ "end": end_token.to_string(),
+ },
+ "state": [serialize_event(s, time_now) for s in room_state.values()],
+ "presence": [],
+ "receipts": [],
+ })
+
+ @defer.inlineCallbacks
+ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
+ membership, is_guest):
+ current_state = yield self.state.get_current_state(
+ room_id=room_id,
)
# TODO(paul): I wish I was called with user objects not user_id
@@ -442,8 +572,6 @@ class MessageHandler(BaseHandler):
for x in current_state.values()
]
- member_event = current_state.get((EventTypes.Member, user_id,))
-
now_token = yield self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
@@ -460,12 +588,14 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
- states = yield presence_handler.get_states(
- target_users=[UserID.from_string(m.user_id) for m in room_members],
- auth_user=auth_user,
- as_event=True,
- check_auth=False,
- )
+ states = {}
+ if not is_guest:
+ states = yield presence_handler.get_states(
+ target_users=[UserID.from_string(m.user_id) for m in room_members],
+ auth_user=auth_user,
+ as_event=True,
+ check_auth=False,
+ )
defer.returnValue(states.values())
@@ -485,7 +615,7 @@ class MessageHandler(BaseHandler):
).addErrback(unwrapFirstError)
messages = yield self._filter_events_for_client(
- user_id, room_id, messages
+ user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -493,8 +623,7 @@ class MessageHandler(BaseHandler):
time_now = self.clock.time_msec()
- defer.returnValue({
- "membership": member_event.membership,
+ ret = {
"room_id": room_id,
"messages": {
"chunk": [serialize_event(m, time_now) for m in messages],
@@ -504,4 +633,8 @@ class MessageHandler(BaseHandler):
"state": state,
"presence": presence,
"receipts": receipts,
- })
+ }
+ if not is_guest:
+ ret["membership"] = membership
+
+ defer.returnValue(ret)
|