diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-08-12 17:07:22 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-08-12 17:21:14 +0100 |
commit | 998a72d4d9ec6e73000888dcdf51437ec427fbee (patch) | |
tree | f8fa9d5deb820b49eb3a216e194ee83e14fb9eda /synapse/handlers | |
parent | Bump the version of twisted needed for setup_requires to 15.2.1 (diff) | |
parent | Merge pull request #220 from matrix-org/markjh/generate_keys (diff) | |
download | synapse-998a72d4d9ec6e73000888dcdf51437ec427fbee.tar.xz |
Merge branch 'develop' into markjh/twisted-15
Conflicts: synapse/http/matrixfederationclient.py
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 4 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 2 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 6 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 292 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 132 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 30 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 212 | ||||
-rw-r--r-- | synapse/handlers/register.py | 32 | ||||
-rw-r--r-- | synapse/handlers/room.py | 97 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 48 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 2 |
13 files changed, 708 insertions, 153 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 685792dbdc..dc5b6ef79d 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -32,6 +32,7 @@ from .appservice import ApplicationServicesHandler from .sync import SyncHandler from .auth import AuthHandler from .identity import IdentityHandler +from .receipts import ReceiptsHandler class Handlers(object): @@ -57,6 +58,7 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.receipts_handler = ReceiptsHandler(hs) asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( hs, asapi, AppServiceScheduler( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 833ff41377..d6c064b398 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -78,7 +78,9 @@ class BaseHandler(object): context = yield state_handler.compute_event_context(builder) if builder.is_state(): - builder.prev_state = context.prev_state_events + builder.prev_state = yield self.store.add_event_hashes( + context.prev_state_events + ) yield self.auth.add_auth_events(builder, context) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8269482e47..1240e51649 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -177,7 +177,7 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - if not user_info: + if user_info: defer.returnValue(False) return diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 63071653a3..1ecf7fef17 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -85,8 +85,10 @@ class AuthHandler(BaseHandler): # email auth link on there). It's probably too open to abuse # because it lets unauthenticated clients store arbitrary objects # on a home server. - # sess['clientdict'] = clientdict - # self._save_session(sess) + # Revisit: Assumimg the REST APIs do sensible validation, the data + # isn't arbintrary. + sess['clientdict'] = clientdict + self._save_session(sess) pass elif 'clientdict' in sess: clientdict = sess['clientdict'] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 46ce3699d7..f7155fd8d3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,6 +31,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.events.utils import prune_event + from synapse.util.retryutils import NotRetryingDestination from twisted.internet import defer @@ -138,26 +140,29 @@ class FederationHandler(BaseHandler): if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication # layer, then we should handle them (if we haven't before.) + + event_infos = [] + for e in itertools.chain(auth_chain, state): if e.event_id in seen_ids: continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - seen_ids.add(e.event_id) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append({ + "event": e, + "auth_events": auth, + }) + seen_ids.add(e.event_id) + + yield self._handle_new_events( + origin, + event_infos, + outliers=True + ) try: _, event_stream_id, max_stream_id = yield self._handle_new_event( @@ -222,6 +227,56 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) + @defer.inlineCallbacks + def _filter_events_for_server(self, server_name, room_id, events): + states = yield self.store.get_state_for_events( + room_id, [e.event_id for e in events], + ) + + events_and_states = zip(events, states) + + def redact_disallowed(event_and_state): + event, state = event_and_state + + if not state: + return event + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + if visibility in ["invited", "joined"]: + # We now loop through all state events looking for + # membership states for the requesting server to determine + # if the server is either in the room or has been invited + # into the room. + for ev in state.values(): + if ev.type != EventTypes.Member: + continue + try: + domain = UserID.from_string(ev.state_key).domain + except: + continue + + if domain != server_name: + continue + + memtype = ev.membership + if memtype == Membership.JOIN: + return event + elif memtype == Membership.INVITE: + if visibility == "invited": + return event + else: + return prune_event(event) + + return event + + res = map(redact_disallowed, events_and_states) + + logger.info("_filter_events_for_server %r", res) + + defer.returnValue(res) + @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): @@ -247,9 +302,15 @@ class FederationHandler(BaseHandler): if set(e_id for e_id, _ in ev.prev_events) - event_ids ] + logger.info( + "backfill: Got %d events with %d edges", + len(events), len(edges), + ) + # For each edge get the current state. auth_events = {} + state_events = {} events_to_state = {} for e_id in edges: state, auth = yield self.replication_layer.get_state_for_room( @@ -258,27 +319,57 @@ class FederationHandler(BaseHandler): event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) + auth_events.update({s.event_id: s for s in state}) + state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state - yield defer.gatherResults( - [ - self._handle_new_event(dest, a) - for a in auth_events.values() - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + seen_events = yield self.store.have_events( + set(auth_events.keys()) | set(state_events.keys()) + ) + + all_events = events + state_events.values() + auth_events.values() + required_auth = set( + a_id for event in all_events for a_id, _ in event.auth_events + ) - yield defer.gatherResults( + missing_auth = required_auth - set(auth_events) + results = yield defer.gatherResults( [ - self._handle_new_event( - dest, event_map[e_id], - state=events_to_state[e_id], - backfilled=True, + self.replication_layer.get_pdu( + [dest], + event_id, + outlier=True, + timeout=10000, ) - for e_id in events_to_state + for event_id in missing_auth ], consumeErrors=True ).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results}) + + ev_infos = [] + for a in auth_events.values(): + if a.event_id in seen_events: + continue + ev_infos.append({ + "event": a, + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + } + }) + + for e_id in events_to_state: + ev_infos.append({ + "event": event_map[e_id], + "state": events_to_state[e_id], + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + } + }) events.sort(key=lambda e: e.depth) @@ -286,10 +377,14 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue - yield self._handle_new_event( - dest, event, - backfilled=True, - ) + ev_infos.append({ + "event": event, + }) + + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) defer.returnValue(events) @@ -555,32 +650,22 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._handle_auth_events( - origin, [e for e in auth_chain if e.event_id != event.event_id] - ) - - @defer.inlineCallbacks - def handle_state(e): + ev_infos = [] + for e in itertools.chain(state, auth_chain): if e.event_id == event.event_id: - return + continue e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { + auth_ids = [e_id for e_id, _ in e.auth_events] + ev_infos.append({ + "event": e, + "auth_events": { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + }) - yield defer.DeferredList([handle_state(e) for e in state]) + yield self._handle_new_events(origin, ev_infos, outliers=True) auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { @@ -837,6 +922,8 @@ class FederationHandler(BaseHandler): limit ) + events = yield self._filter_events_for_server(origin, room_id, events) + defer.returnValue(events) @defer.inlineCallbacks @@ -895,24 +982,62 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, origin, event, state=None, backfilled=False, current_state=None, auth_events=None): - logger.debug( - "_handle_new_event: %s, sigs: %s", - event.event_id, event.signatures, + outlier = event.internal_metadata.is_outlier() + + context = yield self._prep_event( + origin, event, + state=state, + backfilled=backfilled, + current_state=current_state, + auth_events=auth_events, ) - context = yield self.state_handler.compute_event_context( - event, old_state=state + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=(not outlier and not backfilled), + current_state=current_state, ) - if not auth_events: - auth_events = context.current_state + defer.returnValue((context, event_stream_id, max_stream_id)) - logger.debug( - "_handle_new_event: %s, auth_events: %s", - event.event_id, auth_events, + @defer.inlineCallbacks + def _handle_new_events(self, origin, event_infos, backfilled=False, + outliers=False): + contexts = yield defer.gatherResults( + [ + self._prep_event( + origin, + ev_info["event"], + state=ev_info.get("state"), + backfilled=backfilled, + auth_events=ev_info.get("auth_events"), + ) + for ev_info in event_infos + ] ) - is_new_state = not event.internal_metadata.is_outlier() + yield self.store.persist_events( + [ + (ev_info["event"], context) + for ev_info, context in itertools.izip(event_infos, contexts) + ], + backfilled=backfilled, + is_new_state=(not outliers and not backfilled), + ) + + @defer.inlineCallbacks + def _prep_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): + outlier = event.internal_metadata.is_outlier() + + context = yield self.state_handler.compute_event_context( + event, old_state=state, outlier=outlier, + ) + + if not auth_events: + auth_events = context.current_state # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. @@ -937,26 +1062,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - # FIXME: Don't store as rejected with AUTH_ERROR if we haven't - # seen all the auth events. - yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=False, - current_state=current_state, - ) - raise - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=(is_new_state and not backfilled), - current_state=current_state, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, @@ -1019,14 +1125,24 @@ class FederationHandler(BaseHandler): @log_function def do_auth(self, origin, event, context, auth_events): # Check if we have all the auth events. - have_events = yield self.store.have_events( - [e_id for e_id, _ in event.auth_events] - ) - + current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(e_id for e_id, _ in event.auth_events) + + if event_auth_events - current_state: + have_events = yield self.store.have_events( + event_auth_events - current_state + ) + else: + have_events = {} + + have_events.update({ + e.event_id: "" + for e in auth_events.values() + }) + seen_events = set(have_events.keys()) - missing_auth = event_auth_events - seen_events + missing_auth = event_auth_events - seen_events - current_state if missing_auth: logger.info("Missing auth: %s", missing_auth) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6200e10775..c1095708a0 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -44,7 +44,7 @@ class IdentityHandler(BaseHandler): http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] - trustedIdServers = ['matrix.org'] + trustedIdServers = ['matrix.org', 'vector.im'] if 'id_server' in creds: id_server = creds['id_server'] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..9d6d4f0978 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -113,11 +113,21 @@ class MessageHandler(BaseHandler): "room_key", next_key ) + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + events = yield self._filter_events_for_client(user_id, room_id, events) + time_now = self.clock.time_msec() chunk = { "chunk": [ - serialize_event(e, time_now, as_client_event) for e in events + serialize_event(e, time_now, as_client_event) + for e in events ], "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), @@ -126,6 +136,52 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): + states = yield self.store.get_state_for_events( + room_id, [e.event_id for e in events], + ) + + events_and_states = zip(events, states) + + def allowed(event_and_state): + event, state = event_and_state + + if event.type == EventTypes.RoomHistoryVisibility: + return True + + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE + + return True + + events_and_states = filter(allowed, events_and_states) + defer.returnValue([ + ev + for ev, _ in events_and_states + ]) + + @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, client=None, txn_id=None): """ Given a dict from a client, create and handle a new event. @@ -278,6 +334,11 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("presence"), None ) + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -316,6 +377,10 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, event.room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() @@ -344,7 +409,8 @@ class MessageHandler(BaseHandler): ret = { "rooms": rooms_ret, "presence": presence, - "end": now_token.to_string() + "receipts": receipt, + "end": now_token.to_string(), } defer.returnValue(ret) @@ -380,15 +446,6 @@ class MessageHandler(BaseHandler): if limit is None: limit = 10 - messages, token = yield self.store.get_recent_events_for_room( - room_id, - limit=limit, - end_token=now_token.room_key, - ) - - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) - room_members = [ m for m in current_state.values() if m.type == EventTypes.Member @@ -396,19 +453,45 @@ class MessageHandler(BaseHandler): ] presence_handler = self.hs.get_handlers().presence_handler - presence = [] - for m in room_members: - try: - member_presence = yield presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - ) - presence.append(member_presence) - except SynapseError: - logger.exception( - "Failed to get member presence of %r", m.user_id + + @defer.inlineCallbacks + def get_presence(): + presence_defs = yield defer.DeferredList( + [ + presence_handler.get_state( + target_user=UserID.from_string(m.user_id), + auth_user=auth_user, + as_event=True, + check_auth=False, + ) + for m in room_members + ], + consumeErrors=True, + ) + + defer.returnValue([p for success, p in presence_defs if success]) + + receipts_handler = self.hs.get_handlers().receipts_handler + + presence, receipts, (messages, token) = yield defer.gatherResults( + [ + get_presence(), + receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), + self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=now_token.room_key, ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + messages = yield self._filter_events_for_client( + user_id, room_id, messages + ) + + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() @@ -421,5 +504,6 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), }, "state": state, - "presence": presence + "presence": presence, + "receipts": receipts, }) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 023ad33ab0..341a516da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler): defer.returnValue(False) @defer.inlineCallbacks - def get_state(self, target_user, auth_user, as_event=False): + def get_state(self, target_user, auth_user, as_event=False, check_auth=True): if self.hs.is_mine(target_user): - visible = yield self.is_presence_visible( - observer_user=auth_user, - observed_user=target_user - ) + if check_auth: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=target_user + ) - if not visible: - raise SynapseError(404, "Presence information not visible") - state = yield self.store.get_presence_state(target_user.localpart) - if "mtime" in state: - del state["mtime"] - state["presence"] = state.pop("state") + if not visible: + raise SynapseError(404, "Presence information not visible") if target_user in self._user_cachemap: - cached_state = self._user_cachemap[target_user].get_state() - if "last_active" in cached_state: - state["last_active"] = cached_state["last_active"] + state = self._user_cachemap[target_user].get_state() + else: + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state.pop("state") else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() @@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler): room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "presence_key", self._user_cachemap_latest_serial, users_to_push, diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py new file mode 100644 index 0000000000..415dd339f6 --- /dev/null +++ b/synapse/handlers/receipts.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptsHandler(BaseHandler): + def __init__(self, hs): + super(ReceiptsHandler, self).__init__(hs) + + self.hs = hs + self.federation = hs.get_replication_layer() + self.federation.register_edu_handler( + "m.receipt", self._received_remote_receipt + ) + self.clock = self.hs.get_clock() + + self._receipt_cache = None + + @defer.inlineCallbacks + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + """Called when a client tells us a local user has read up to the given + event_id in the room. + """ + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + "data": { + "ts": int(self.clock.time_msec()), + } + } + + is_new = yield self._handle_new_receipts([receipt]) + + if is_new: + self._push_remotes([receipt]) + + @defer.inlineCallbacks + def _received_remote_receipt(self, origin, content): + """Called when we receive an EDU of type m.receipt from a remote HS. + """ + receipts = [ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": user_values["event_ids"], + "data": user_values.get("data", {}), + } + for room_id, room_values in content.items() + for receipt_type, users in room_values.items() + for user_id, user_values in users.items() + ] + + yield self._handle_new_receipts(receipts) + + @defer.inlineCallbacks + def _handle_new_receipts(self, receipts): + """Takes a list of receipts, stores them and informs the notifier. + """ + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + res = yield self.store.insert_receipt( + room_id, receipt_type, user_id, event_ids, data + ) + + if not res: + # res will be None if this read receipt is 'old' + defer.returnValue(False) + + stream_id, max_persisted_id = res + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_persisted_id, rooms=[room_id] + ) + + defer.returnValue(True) + + @defer.inlineCallbacks + def _push_remotes(self, receipts): + """Given a list of receipts, works out which remote servers should be + poked and pokes them. + """ + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + remotedomains = set() + + rm_handler = self.hs.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=None, remotedomains=remotedomains + ) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } + } + }, + }, + ) + + @defer.inlineCallbacks + def get_receipts_for_room(self, room_id, to_key): + """Gets all receipts for a room, upto the given key. + """ + result = yield self.store.get_linearized_receipts_for_room( + room_id, + to_key=to_key, + ) + + if not result: + defer.returnValue([]) + + event = { + "type": "m.receipt", + "room_id": room_id, + "content": result, + } + + defer.returnValue([event]) + + +class ReceiptEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + defer.returnValue(([], from_key)) + from_key = int(from_key) + to_key = yield self.get_current_key() + + if from_key == to_key: + defer.returnValue(([], to_key)) + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, + from_key=from_key, + to_key=to_key, + ) + + defer.returnValue((events, to_key)) + + def get_current_key(self, direction='f'): + return self.store.get_max_receipt_stream_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + to_key = int(config.from_key) + defer.returnValue(([], to_key)) + + if config.to_key: + from_key = int(config.to_key) + else: + from_key = None + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, + from_key=from_key, + to_key=to_key, + ) + + defer.returnValue((events, to_key)) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7b68585a17..f81d75017d 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -73,7 +73,8 @@ class RegistrationHandler(BaseHandler): localpart : The local part of the user ID to register. If None, one will be randomly generated. password (str) : The password to assign to this user so they can - login again. + login again. This can be None which means they cannot login again + via a password (e.g. the user is an application service user). Returns: A tuple of (user_id, access_token). Raises: @@ -193,6 +194,35 @@ class RegistrationHandler(BaseHandler): logger.info("Valid captcha entered from %s", ip) @defer.inlineCallbacks + def register_saml2(self, localpart): + """ + Registers email_id as SAML2 Based Auth. + """ + if urllib.quote(localpart) != localpart: + raise SynapseError( + 400, + "User ID must only contain characters which do not" + " require URL encoding." + ) + user = UserID(localpart, self.hs.hostname) + user_id = user.to_string() + + yield self.check_user_id_is_valid(user_id) + token = self._generate_token(user_id) + try: + yield self.store.register( + user_id=user_id, + token=token, + password_hash=None + ) + yield self.distributor.fire("registered_user", user) + except Exception, e: + yield self.store.add_access_token_to_user(user_id, token) + # Ignore Registration errors + logger.exception(e) + defer.returnValue((user_id, token)) + + @defer.inlineCallbacks def register_email(self, threepidCreds): """ Registers emails with an identity server. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4bd027d9bb..7511d294f3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -19,12 +19,15 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID -from synapse.api.constants import EventTypes, Membership, JoinRules +from synapse.api.constants import ( + EventTypes, Membership, JoinRules, RoomCreationPreset, +) from synapse.api.errors import StoreError, SynapseError from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event +from collections import OrderedDict import logging import string @@ -33,6 +36,19 @@ logger = logging.getLogger(__name__) class RoomCreationHandler(BaseHandler): + PRESETS_DICT = { + RoomCreationPreset.PRIVATE_CHAT: { + "join_rules": JoinRules.INVITE, + "history_visibility": "invited", + "original_invitees_have_ops": False, + }, + RoomCreationPreset.PUBLIC_CHAT: { + "join_rules": JoinRules.PUBLIC, + "history_visibility": "shared", + "original_invitees_have_ops": False, + }, + } + @defer.inlineCallbacks def create_room(self, user_id, room_id, config): """ Creates a new room. @@ -121,9 +137,25 @@ class RoomCreationHandler(BaseHandler): servers=[self.hs.hostname], ) + preset_config = config.get( + "preset", + RoomCreationPreset.PUBLIC_CHAT + if is_public + else RoomCreationPreset.PRIVATE_CHAT + ) + + raw_initial_state = config.get("initial_state", []) + + initial_state = OrderedDict() + for val in raw_initial_state: + initial_state[(val["type"], val.get("state_key", ""))] = val["content"] + user = UserID.from_string(user_id) creation_events = self._create_events_for_new_room( - user, room_id, is_public=is_public + user, room_id, + preset_config=preset_config, + invite_list=invite_list, + initial_state=initial_state, ) msg_handler = self.hs.get_handlers().message_handler @@ -170,7 +202,10 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) - def _create_events_for_new_room(self, creator, room_id, is_public=False): + def _create_events_for_new_room(self, creator, room_id, preset_config, + invite_list, initial_state): + config = RoomCreationHandler.PRESETS_DICT[preset_config] + creator_id = creator.to_string() event_keys = { @@ -203,9 +238,10 @@ class RoomCreationHandler(BaseHandler): }, ) - power_levels_event = create( - etype=EventTypes.PowerLevels, - content={ + returned_events = [creation_event, join_event] + + if (EventTypes.PowerLevels, '') not in initial_state: + power_level_content = { "users": { creator.to_string(): 100, }, @@ -213,6 +249,7 @@ class RoomCreationHandler(BaseHandler): "events": { EventTypes.Name: 100, EventTypes.PowerLevels: 100, + EventTypes.RoomHistoryVisibility: 100, }, "events_default": 0, "state_default": 50, @@ -220,21 +257,43 @@ class RoomCreationHandler(BaseHandler): "kick": 50, "redact": 50, "invite": 0, - }, - ) + } - join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE - join_rules_event = create( - etype=EventTypes.JoinRules, - content={"join_rule": join_rule}, - ) + if config["original_invitees_have_ops"]: + for invitee in invite_list: + power_level_content["users"][invitee] = 100 - return [ - creation_event, - join_event, - power_levels_event, - join_rules_event, - ] + power_levels_event = create( + etype=EventTypes.PowerLevels, + content=power_level_content, + ) + + returned_events.append(power_levels_event) + + if (EventTypes.JoinRules, '') not in initial_state: + join_rules_event = create( + etype=EventTypes.JoinRules, + content={"join_rule": config["join_rules"]}, + ) + + returned_events.append(join_rules_event) + + if (EventTypes.RoomHistoryVisibility, '') not in initial_state: + history_event = create( + etype=EventTypes.RoomHistoryVisibility, + content={"history_visibility": config["history_visibility"]} + ) + + returned_events.append(history_event) + + for (etype, state_key), content in initial_state.items(): + returned_events.append(create( + etype=etype, + state_key=state_key, + content=content, + )) + + return returned_events class RoomMemberHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd8c603681..6cff6230c1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -293,6 +293,51 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): + states = yield self.store.get_state_for_events( + room_id, [e.event_id for e in events], + ) + + events_and_states = zip(events, states) + + def allowed(event_and_state): + event, state = event_and_state + + if event.type == EventTypes.RoomHistoryVisibility: + return True + + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE + + return True + events_and_states = filter(allowed, events_and_states) + defer.returnValue([ + ev + for ev, _ in events_and_states + ]) + + @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None): limited = True @@ -313,6 +358,9 @@ class SyncHandler(BaseHandler): (room_key, _) = keys end_key = "s" + room_key.split('-')[-1] loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents = yield self._filter_events_for_client( + sync_config.user.to_string(), room_id, loaded_recents, + ) loaded_recents.extend(recents) recents = loaded_recents if len(events) <= load_limit: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a9895292c2..026bd2b9d4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[room_id] ) |