diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 59f687e0f1..793b3fcd8b 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -19,7 +19,6 @@ from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.types import UserID
from synapse.api.errors import LoginError, Codes
-from synapse.http.client import SimpleHttpClient
from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError
@@ -187,7 +186,7 @@ class AuthHandler(BaseHandler):
# TODO: get this from the homeserver rather than creating a new one for
# each request
try:
- client = SimpleHttpClient(self.hs)
+ client = self.hs.get_simple_http_client()
resp_body = yield client.post_urlencoded_get_json(
self.hs.config.recaptcha_siteverify_api,
args={
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4ff20599d6..3aff80bf59 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -125,60 +125,72 @@ class FederationHandler(BaseHandler):
)
if not is_in_room and not event.internal_metadata.is_outlier():
logger.debug("Got event for room we're not in.")
- current_state = state
- event_ids = set()
- if state:
- event_ids |= {e.event_id for e in state}
- if auth_chain:
- event_ids |= {e.event_id for e in auth_chain}
+ try:
+ event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ auth_chain, state, event
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ else:
+ event_ids = set()
+ if state:
+ event_ids |= {e.event_id for e in state}
+ if auth_chain:
+ event_ids |= {e.event_id for e in auth_chain}
+
+ seen_ids = set(
+ (yield self.store.have_events(event_ids)).keys()
+ )
- if state and auth_chain is not None:
- # If we have any state or auth_chain given to us by the replication
- # layer, then we should handle them (if we haven't before.)
+ if state and auth_chain is not None:
+ # If we have any state or auth_chain given to us by the replication
+ # layer, then we should handle them (if we haven't before.)
- event_infos = []
+ event_infos = []
- for e in itertools.chain(auth_chain, state):
- if e.event_id in seen_ids:
- continue
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- event_infos.append({
- "event": e,
- "auth_events": auth,
- })
- seen_ids.add(e.event_id)
+ for e in itertools.chain(auth_chain, state):
+ if e.event_id in seen_ids:
+ continue
+ e.internal_metadata.outlier = True
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in auth_chain
+ if e.event_id in auth_ids
+ }
+ event_infos.append({
+ "event": e,
+ "auth_events": auth,
+ })
+ seen_ids.add(e.event_id)
- yield self._handle_new_events(
- origin,
- event_infos,
- outliers=True
- )
+ yield self._handle_new_events(
+ origin,
+ event_infos,
+ outliers=True
+ )
- try:
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
- origin,
- event,
- state=state,
- backfilled=backfilled,
- current_state=current_state,
- )
- except AuthError as e:
- raise FederationError(
- "ERROR",
- e.code,
- e.msg,
- affected=event.event_id,
- )
+ try:
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin,
+ event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
# if we're receiving valid events from an origin,
# it's probably a good idea to mark it as not in retry-state
@@ -649,35 +661,8 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- ev_infos = []
- for e in itertools.chain(state, auth_chain):
- if e.event_id == event.event_id:
- continue
-
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- ev_infos.append({
- "event": e,
- "auth_events": {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- })
-
- yield self._handle_new_events(origin, ev_infos, outliers=True)
-
- auth_ids = [e_id for e_id, _ in event.auth_events]
- auth_events = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
-
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
- origin,
- new_event,
- state=state,
- current_state=state,
- auth_events=auth_events,
+ event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ auth_chain, state, event
)
with PreserveLoggingContext():
@@ -1027,6 +1012,76 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
+ def _persist_auth_tree(self, auth_events, state, event):
+ """Checks the auth chain is valid (and passes auth checks) for the
+ state and event. Then persists the auth chain and state atomically.
+ Persists the event seperately.
+
+ Returns:
+ 2-tuple of (event_stream_id, max_stream_id) from the persist_event
+ call for `event`
+ """
+ events_to_context = {}
+ for e in itertools.chain(auth_events, state):
+ ctx = yield self.state_handler.compute_event_context(
+ e, outlier=True,
+ )
+ events_to_context[e.event_id] = ctx
+ e.internal_metadata.outlier = True
+
+ event_map = {
+ e.event_id: e
+ for e in auth_events
+ }
+
+ create_event = None
+ for e in auth_events:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ create_event = e
+ break
+
+ for e in itertools.chain(auth_events, state, [event]):
+ auth_for_e = {
+ (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
+ for e_id, _ in e.auth_events
+ }
+ if create_event:
+ auth_for_e[(EventTypes.Create, "")] = create_event
+
+ try:
+ self.auth.check(e, auth_events=auth_for_e)
+ except AuthError as err:
+ logger.warn(
+ "Rejecting %s because %s",
+ e.event_id, err.msg
+ )
+
+ if e == event:
+ raise
+ events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+
+ yield self.store.persist_events(
+ [
+ (e, events_to_context[e.event_id])
+ for e in itertools.chain(auth_events, state)
+ ],
+ is_new_state=False,
+ )
+
+ new_event_context = yield self.state_handler.compute_event_context(
+ event, old_state=state, outlier=False,
+ )
+
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event, new_event_context,
+ backfilled=False,
+ is_new_state=True,
+ current_state=state,
+ )
+
+ defer.returnValue((event_stream_id, max_stream_id))
+
+ @defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
@@ -1456,52 +1511,3 @@ class FederationHandler(BaseHandler):
},
"missing": [e.event_id for e in missing_locals],
})
-
- @defer.inlineCallbacks
- def _handle_auth_events(self, origin, auth_events):
- auth_ids_to_deferred = {}
-
- def process_auth_ev(ev):
- auth_ids = [e_id for e_id, _ in ev.auth_events]
-
- prev_ds = [
- auth_ids_to_deferred[i]
- for i in auth_ids
- if i in auth_ids_to_deferred
- ]
-
- d = defer.Deferred()
-
- auth_ids_to_deferred[ev.event_id] = d
-
- @defer.inlineCallbacks
- def f(*_):
- ev.internal_metadata.outlier = True
-
- try:
- auth = {
- (e.type, e.state_key): e for e in auth_events
- if e.event_id in auth_ids
- }
-
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
- except:
- logger.exception(
- "Failed to handle auth event %s",
- ev.event_id,
- )
-
- d.callback(None)
-
- if prev_ds:
- dx = defer.DeferredList(prev_ds)
- dx.addBoth(f)
- else:
- f()
-
- for e in auth_events:
- process_auth_ev(e)
-
- yield defer.DeferredList(auth_ids_to_deferred.values())
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 23b779ad7c..bda8eb5f3f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,13 +16,13 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError, SynapseError
+from synapse.api.errors import SynapseError
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID, RoomStreamToken
+from synapse.types import UserID, RoomStreamToken, StreamToken
from ._base import BaseHandler
@@ -71,7 +71,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
- feedback=False, as_client_event=True):
+ as_client_event=True):
"""Get messages in a room.
Args:
@@ -79,26 +79,52 @@ class MessageHandler(BaseHandler):
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
- feedback (bool): True to get compressed feedback with the messages
as_client_event (bool): True to get events in client-server format.
Returns:
dict: Pagination API results
"""
- yield self.auth.check_joined_room(room_id, user_id)
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
data_source = self.hs.get_event_sources().sources["room"]
- if not pagin_config.from_token:
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token(
direction='b'
)
)
+ room_token = pagin_config.from_token.room_key
- room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+ room_token = RoomStreamToken.parse(room_token)
if room_token.topological is None:
raise SynapseError(400, "Invalid token")
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ if member_event.membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event.event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < room_token.topological:
+ source_config.from_key = str(leave_token)
+
+ if source_config.direction == "f":
+ if source_config.to_key is None:
+ source_config.to_key = str(leave_token)
+ else:
+ to_token = RoomStreamToken.parse(source_config.to_key)
+ if leave_token.topological < to_token.topological:
+ source_config.to_key = str(leave_token)
+
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, room_token.topological
)
@@ -106,7 +132,7 @@ class MessageHandler(BaseHandler):
user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
- user, pagin_config.get_source_config("room"), room_id
+ user, source_config, room_id
)
next_token = pagin_config.from_token.copy_and_replace(
@@ -255,29 +281,26 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- have_joined = yield self.auth.check_joined_room(room_id, user_id)
- if not have_joined:
- raise RoomError(403, "User not in room.")
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- data = yield self.state_handler.get_current_state(
- room_id, event_type, state_key
- )
- defer.returnValue(data)
-
- @defer.inlineCallbacks
- def get_feedback(self, event_id):
- # yield self.auth.check_joined_room(room_id, user_id)
-
- # Pull out the feedback from the db
- fb = yield self.store.get_feedback(event_id)
+ if member_event.membership == Membership.JOIN:
+ data = yield self.state_handler.get_current_state(
+ room_id, event_type, state_key
+ )
+ elif member_event.membership == Membership.LEAVE:
+ key = (event_type, state_key)
+ room_state = yield self.store.get_state_for_events(
+ room_id, [member_event.event_id], [key]
+ )
+ data = room_state[member_event.event_id].get(key)
- if fb:
- defer.returnValue(fb)
- defer.returnValue(None)
+ defer.returnValue(data)
@defer.inlineCallbacks
def get_state_events(self, user_id, room_id):
- """Retrieve all state events for a given room.
+ """Retrieve all state events for a given room. If the user is
+ joined to the room then return the current state. If the user has
+ left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
@@ -285,18 +308,23 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
- yield self.auth.check_joined_room(room_id, user_id)
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+
+ if member_event.membership == Membership.JOIN:
+ room_state = yield self.state_handler.get_current_state(room_id)
+ elif member_event.membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ room_id, [member_event.event_id], None
+ )
+ room_state = room_state[member_event.event_id]
- # TODO: This is duplicating logic from snapshot_all_rooms
- current_state = yield self.state_handler.get_current_state(room_id)
now = self.clock.time_msec()
defer.returnValue(
- [serialize_event(c, now) for c in current_state.values()]
+ [serialize_event(c, now) for c in room_state.values()]
)
@defer.inlineCallbacks
- def snapshot_all_rooms(self, user_id=None, pagin_config=None,
- feedback=False, as_client_event=True):
+ def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True):
"""Retrieve a snapshot of all rooms the user is invited or has joined.
This snapshot may include messages for all rooms where the user is
@@ -306,7 +334,6 @@ class MessageHandler(BaseHandler):
user_id (str): The ID of the user making the request.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config used to determine how many messages *PER ROOM* to return.
- feedback (bool): True to get feedback along with these messages.
as_client_event (bool): True to get events in client-server format.
Returns:
A list of dicts with "room_id" and "membership" keys for all rooms
@@ -316,7 +343,9 @@ class MessageHandler(BaseHandler):
"""
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user_id,
- membership_list=[Membership.INVITE, Membership.JOIN]
+ membership_list=[
+ Membership.INVITE, Membership.JOIN, Membership.LEAVE
+ ]
)
user = UserID.from_string(user_id)
@@ -358,19 +387,32 @@ class MessageHandler(BaseHandler):
rooms_ret.append(d)
- if event.membership != Membership.JOIN:
+ if event.membership not in (Membership.JOIN, Membership.LEAVE):
return
+
try:
+ if event.membership == Membership.JOIN:
+ room_end_token = now_token.room_key
+ deferred_room_state = self.state_handler.get_current_state(
+ event.room_id
+ )
+ elif event.membership == Membership.LEAVE:
+ room_end_token = "s%d" % (event.stream_ordering,)
+ deferred_room_state = self.store.get_state_for_events(
+ event.room_id, [event.event_id], None
+ )
+ deferred_room_state.addCallback(
+ lambda states: states[event.event_id]
+ )
+
(messages, token), current_state = yield defer.gatherResults(
[
self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
- end_token=now_token.room_key,
- ),
- self.state_handler.get_current_state(
- event.room_id
+ end_token=room_end_token,
),
+ deferred_room_state,
]
).addErrback(unwrapFirstError)
@@ -417,15 +459,85 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
@defer.inlineCallbacks
- def room_initial_sync(self, user_id, room_id, pagin_config=None,
- feedback=False):
- current_state = yield self.state.get_current_state(
- room_id=room_id,
+ def room_initial_sync(self, user_id, room_id, pagin_config=None):
+ """Capture the a snapshot of a room. If user is currently a member of
+ the room this will be what is currently in the room. If the user left
+ the room this will be what was in the room when they left.
+
+ Args:
+ user_id(str): The user to get a snapshot for.
+ room_id(str): The room to get a snapshot of.
+ pagin_config(synapse.streams.config.PaginationConfig):
+ The pagination config used to determine how many messages to
+ return.
+ Raises:
+ AuthError if the user wasn't in the room.
+ Returns:
+ A JSON serialisable dict with the snapshot of the room.
+ """
+
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+
+ if member_event.membership == Membership.JOIN:
+ result = yield self._room_initial_sync_joined(
+ user_id, room_id, pagin_config, member_event
+ )
+ elif member_event.membership == Membership.LEAVE:
+ result = yield self._room_initial_sync_parted(
+ user_id, room_id, pagin_config, member_event
+ )
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
+ member_event):
+ room_state = yield self.store.get_state_for_events(
+ member_event.room_id, [member_event.event_id], None
+ )
+
+ room_state = room_state[member_event.event_id]
+
+ limit = pagin_config.limit if pagin_config else None
+ if limit is None:
+ limit = 10
+
+ stream_token = yield self.store.get_stream_token_for_event(
+ member_event.event_id
+ )
+
+ messages, token = yield self.store.get_recent_events_for_room(
+ room_id,
+ limit=limit,
+ end_token=stream_token
+ )
+
+ messages = yield self._filter_events_for_client(
+ user_id, room_id, messages
)
- yield self.auth.check_joined_room(
- room_id, user_id,
- current_state=current_state
+ start_token = StreamToken(token[0], 0, 0, 0)
+ end_token = StreamToken(token[1], 0, 0, 0)
+
+ time_now = self.clock.time_msec()
+
+ defer.returnValue({
+ "membership": member_event.membership,
+ "room_id": room_id,
+ "messages": {
+ "chunk": [serialize_event(m, time_now) for m in messages],
+ "start": start_token.to_string(),
+ "end": end_token.to_string(),
+ },
+ "state": [serialize_event(s, time_now) for s in room_state.values()],
+ "presence": [],
+ "receipts": [],
+ })
+
+ @defer.inlineCallbacks
+ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
+ member_event):
+ current_state = yield self.state.get_current_state(
+ room_id=room_id,
)
# TODO(paul): I wish I was called with user objects not user_id
@@ -439,8 +551,6 @@ class MessageHandler(BaseHandler):
for x in current_state.values()
]
- member_event = current_state.get((EventTypes.Member, user_id,))
-
now_token = yield self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 4f8ad824b5..ac636255c2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,7 +25,6 @@ from synapse.api.constants import (
from synapse.api.errors import StoreError, SynapseError
from synapse.util import stringutils, unwrapFirstError
from synapse.util.async import run_on_reactor
-from synapse.events.utils import serialize_event
from collections import OrderedDict
import logging
@@ -39,7 +38,7 @@ class RoomCreationHandler(BaseHandler):
PRESETS_DICT = {
RoomCreationPreset.PRIVATE_CHAT: {
"join_rules": JoinRules.INVITE,
- "history_visibility": "invited",
+ "history_visibility": "shared",
"original_invitees_have_ops": False,
},
RoomCreationPreset.PUBLIC_CHAT: {
@@ -159,6 +158,7 @@ class RoomCreationHandler(BaseHandler):
invite_list=invite_list,
initial_state=initial_state,
creation_content=creation_content,
+ room_alias=room_alias,
)
msg_handler = self.hs.get_handlers().message_handler
@@ -206,7 +206,8 @@ class RoomCreationHandler(BaseHandler):
defer.returnValue(result)
def _create_events_for_new_room(self, creator, room_id, preset_config,
- invite_list, initial_state, creation_content):
+ invite_list, initial_state, creation_content,
+ room_alias):
config = RoomCreationHandler.PRESETS_DICT[preset_config]
creator_id = creator.to_string()
@@ -276,6 +277,14 @@ class RoomCreationHandler(BaseHandler):
returned_events.append(power_levels_event)
+ if room_alias and (EventTypes.CanonicalAlias, '') not in initial_state:
+ room_alias_event = create(
+ etype=EventTypes.CanonicalAlias,
+ content={"alias": room_alias.to_string()},
+ )
+
+ returned_events.append(room_alias_event)
+
if (EventTypes.JoinRules, '') not in initial_state:
join_rules_event = create(
etype=EventTypes.JoinRules,
@@ -347,41 +356,6 @@ class RoomMemberHandler(BaseHandler):
remotedomains.add(member.domain)
@defer.inlineCallbacks
- def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None,
- limit=0, start_tok=None,
- end_tok=None):
- """Retrieve a list of room members in the room.
-
- Args:
- room_id (str): The room to get the member list for.
- user_id (str): The ID of the user making the request.
- limit (int): The max number of members to return.
- start_tok (str): Optional. The start token if known.
- end_tok (str): Optional. The end token if known.
- Returns:
- dict: A Pagination streamable dict.
- Raises:
- SynapseError if something goes wrong.
- """
- yield self.auth.check_joined_room(room_id, user_id)
-
- member_list = yield self.store.get_room_members(room_id=room_id)
- time_now = self.clock.time_msec()
- event_list = [
- serialize_event(entry, time_now)
- for entry in member_list
- ]
- chunk_data = {
- "start": "START", # FIXME (erikj): START is no longer valid
- "end": "END",
- "chunk": event_list
- }
- # TODO honor Pagination stream params
- # TODO snapshot this list to return on subsequent requests when
- # paginating
- defer.returnValue(chunk_data)
-
- @defer.inlineCallbacks
def change_membership(self, event, context, do_auth=True):
""" Change the membership status of a user in a room.
@@ -533,32 +507,6 @@ class RoomMemberHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _should_invite_join(self, room_id, prev_state, do_auth):
- logger.debug("_should_invite_join: room_id: %s", room_id)
-
- # XXX: We don't do an auth check if we are doing an invite
- # join dance for now, since we're kinda implicitly checking
- # that we are allowed to join when we decide whether or not we
- # need to do the invite/join dance.
-
- # Only do an invite join dance if a) we were invited,
- # b) the person inviting was from a differnt HS and c) we are
- # not currently in the room
- room_host = None
- if prev_state and prev_state.membership == Membership.INVITE:
- room = yield self.store.get_room(room_id)
- inviter = UserID.from_string(
- prev_state.sender
- )
-
- is_remote_invite_join = not self.hs.is_mine(inviter) and not room
- room_host = inviter.domain
- else:
- is_remote_invite_join = False
-
- defer.returnValue((is_remote_invite_join, room_host))
-
- @defer.inlineCallbacks
def get_joined_rooms_for_user(self, user):
"""Returns a list of roomids that the user has any of the given
membership states in."""
@@ -650,7 +598,6 @@ class RoomEventSource(object):
to_key=config.to_key,
direction=config.direction,
limit=config.limit,
- with_feedback=True
)
defer.returnValue((events, next_key))
|