diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 4 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 18 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 137 | ||||
-rw-r--r-- | synapse/handlers/events.py | 20 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 284 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 27 | ||||
-rw-r--r-- | synapse/handlers/login.py | 83 | ||||
-rw-r--r-- | synapse/handlers/message.py | 112 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 91 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 210 | ||||
-rw-r--r-- | synapse/handlers/register.py | 44 | ||||
-rw-r--r-- | synapse/handlers/room.py | 111 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 73 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 16 |
14 files changed, 931 insertions, 299 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 685792dbdc..8725c3c420 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -22,7 +22,6 @@ from .room import ( from .message import MessageHandler from .events import EventStreamHandler, EventHandler from .federation import FederationHandler -from .login import LoginHandler from .profile import ProfileHandler from .presence import PresenceHandler from .directory import DirectoryHandler @@ -32,6 +31,7 @@ from .appservice import ApplicationServicesHandler from .sync import SyncHandler from .auth import AuthHandler from .identity import IdentityHandler +from .receipts import ReceiptsHandler class Handlers(object): @@ -53,10 +53,10 @@ class Handlers(object): self.profile_handler = ProfileHandler(hs) self.presence_handler = PresenceHandler(hs) self.room_list_handler = RoomListHandler(hs) - self.login_handler = LoginHandler(hs) self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.receipts_handler = ReceiptsHandler(hs) asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( hs, asapi, AppServiceScheduler( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d6c064b398..cb992143f5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID +from synapse.types import UserID, RoomAlias from synapse.util.logcontext import PreserveLoggingContext @@ -107,6 +107,22 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context ) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 63071653a3..602c5bcd89 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -47,17 +47,24 @@ class AuthHandler(BaseHandler): self.sessions = {} @defer.inlineCallbacks - def check_auth(self, flows, clientdict, clientip=None): + def check_auth(self, flows, clientdict, clientip): """ Takes a dictionary sent by the client in the login / registration protocol and handles the login flow. + As a side effect, this function fills in the 'creds' key on the user's + session with a map, which maps each auth-type (str) to the relevant + identity authenticated by that auth-type (mostly str, but for captcha, bool). + Args: - flows: list of list of stages - authdict: The dictionary from the client root level, not the - 'auth' key: this method prompts for auth if none is sent. + flows (list): A list of login flows. Each flow is an ordered list of + strings representing auth-types. At least one full + flow must be completed in order for auth to be successful. + clientdict: The dictionary from the client root level, not the + 'auth' key: this method prompts for auth if none is sent. + clientip (str): The IP address of the client. Returns: - A tuple of authed, dict, dict where authed is true if the client + A tuple of (authed, dict, dict) where authed is true if the client has successfully completed an auth flow. If it is true, the first dict contains the authenticated credentials of each stage. @@ -75,7 +82,7 @@ class AuthHandler(BaseHandler): del clientdict['auth'] if 'session' in authdict: sid = authdict['session'] - sess = self._get_session_info(sid) + session = self._get_session_info(sid) if len(clientdict) > 0: # This was designed to allow the client to omit the parameters @@ -85,20 +92,21 @@ class AuthHandler(BaseHandler): # email auth link on there). It's probably too open to abuse # because it lets unauthenticated clients store arbitrary objects # on a home server. - # sess['clientdict'] = clientdict - # self._save_session(sess) - pass - elif 'clientdict' in sess: - clientdict = sess['clientdict'] + # Revisit: Assumimg the REST APIs do sensible validation, the data + # isn't arbintrary. + session['clientdict'] = clientdict + self._save_session(session) + elif 'clientdict' in session: + clientdict = session['clientdict'] if not authdict: defer.returnValue( - (False, self._auth_dict_for_flows(flows, sess), clientdict) + (False, self._auth_dict_for_flows(flows, session), clientdict) ) - if 'creds' not in sess: - sess['creds'] = {} - creds = sess['creds'] + if 'creds' not in session: + session['creds'] = {} + creds = session['creds'] # check auth type currently being presented if 'type' in authdict: @@ -107,15 +115,15 @@ class AuthHandler(BaseHandler): result = yield self.checkers[authdict['type']](authdict, clientip) if result: creds[authdict['type']] = result - self._save_session(sess) + self._save_session(session) for f in flows: if len(set(f) - set(creds.keys())) == 0: logger.info("Auth completed with creds: %r", creds) - self._remove_session(sess) + self._remove_session(session) defer.returnValue((True, creds, clientdict)) - ret = self._auth_dict_for_flows(flows, sess) + ret = self._auth_dict_for_flows(flows, session) ret['completed'] = creds.keys() defer.returnValue((False, ret, clientdict)) @@ -149,22 +157,14 @@ class AuthHandler(BaseHandler): if "user" not in authdict or "password" not in authdict: raise LoginError(400, "", Codes.MISSING_PARAM) - user = authdict["user"] + user_id = authdict["user"] password = authdict["password"] - if not user.startswith('@'): - user = UserID.create(user, self.hs.hostname).to_string() + if not user_id.startswith('@'): + user_id = UserID.create(user_id, self.hs.hostname).to_string() - user_info = yield self.store.get_user_by_id(user_id=user) - if not user_info: - logger.warn("Attempted to login as %s but they do not exist", user) - raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) - - stored_hash = user_info["password_hash"] - if bcrypt.checkpw(password, stored_hash): - defer.returnValue(user) - else: - logger.warn("Failed password login for user %s", user) - raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + self._check_password(user_id, password, password_hash) + defer.returnValue(user_id) @defer.inlineCallbacks def _check_recaptcha(self, authdict, clientip): @@ -268,6 +268,79 @@ class AuthHandler(BaseHandler): return self.sessions[session_id] + @defer.inlineCallbacks + def login_with_password(self, user_id, password): + """ + Authenticates the user with their username and password. + + Used only by the v1 login API. + + Args: + user_id (str): User ID + password (str): Password + Returns: + The access token for the user's session. + Raises: + StoreError if there was a problem storing the token. + LoginError if there was an authentication problem. + """ + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + self._check_password(user_id, password, password_hash) + + reg_handler = self.hs.get_handlers().registration_handler + access_token = reg_handler.generate_token(user_id) + logger.info("Logging in user %s", user_id) + yield self.store.add_access_token_to_user(user_id, access_token) + defer.returnValue((user_id, access_token)) + + @defer.inlineCallbacks + def _find_user_id_and_pwd_hash(self, user_id): + """Checks to see if a user with the given id exists. Will check case + insensitively, but will throw if there are multiple inexact matches. + + Returns: + tuple: A 2-tuple of `(canonical_user_id, password_hash)` + """ + user_infos = yield self.store.get_users_by_id_case_insensitive(user_id) + if not user_infos: + logger.warn("Attempted to login as %s but they do not exist", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + + if len(user_infos) > 1: + if user_id not in user_infos: + logger.warn( + "Attempted to login as %s but it matches more than one user " + "inexactly: %r", + user_id, user_infos.keys() + ) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + + defer.returnValue((user_id, user_infos[user_id])) + else: + defer.returnValue(user_infos.popitem()) + + def _check_password(self, user_id, password, stored_hash): + """Checks that user_id has passed password, raises LoginError if not.""" + if not bcrypt.checkpw(password, stored_hash): + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + + @defer.inlineCallbacks + def set_password(self, user_id, newpassword): + password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt()) + + yield self.store.user_set_password_hash(user_id, password_hash) + yield self.store.user_delete_access_tokens(user_id) + yield self.hs.get_pusherpool().remove_pushers_by_user(user_id) + yield self.store.flush_user(user_id) + + @defer.inlineCallbacks + def add_threepid(self, user_id, medium, address, validated_at): + yield self.store.user_add_threepid( + user_id, medium, address, validated_at, + self.hs.get_clock().time_msec() + ) + def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 993d33ba47..891502c04f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -49,7 +49,12 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, - as_client_event=True, affect_presence=True): + as_client_event=True, affect_presence=True, + only_room_events=False): + """Fetches the events stream for a given user. + + If `only_room_events` is `True` only room events will be returned. + """ auth_user = UserID.from_string(auth_user_id) try: @@ -70,7 +75,15 @@ class EventStreamHandler(BaseHandler): self._streams_per_user[auth_user] += 1 rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user) + + app_service = yield self.store.get_app_service_by_user_id( + auth_user.to_string() + ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + room_ids = set(r.room_id for r in rooms) + else: + room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user) if timeout: # If they've set a timeout set a minimum limit. @@ -81,7 +94,8 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout + auth_user, room_ids, pagin_config, timeout, + only_room_events=only_room_events ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b5d882fd65..4ff20599d6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,6 +31,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.events.utils import prune_event + from synapse.util.retryutils import NotRetryingDestination from twisted.internet import defer @@ -138,26 +140,29 @@ class FederationHandler(BaseHandler): if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication # layer, then we should handle them (if we haven't before.) + + event_infos = [] + for e in itertools.chain(auth_chain, state): if e.event_id in seen_ids: continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - seen_ids.add(e.event_id) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append({ + "event": e, + "auth_events": auth, + }) + seen_ids.add(e.event_id) + + yield self._handle_new_events( + origin, + event_infos, + outliers=True + ) try: _, event_stream_id, max_stream_id = yield self._handle_new_event( @@ -222,6 +227,55 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) + @defer.inlineCallbacks + def _filter_events_for_server(self, server_name, room_id, events): + event_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ) + ) + + def redact_disallowed(event, state): + if not state: + return event + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + if visibility in ["invited", "joined"]: + # We now loop through all state events looking for + # membership states for the requesting server to determine + # if the server is either in the room or has been invited + # into the room. + for ev in state.values(): + if ev.type != EventTypes.Member: + continue + try: + domain = UserID.from_string(ev.state_key).domain + except: + continue + + if domain != server_name: + continue + + memtype = ev.membership + if memtype == Membership.JOIN: + return event + elif memtype == Membership.INVITE: + if visibility == "invited": + return event + else: + return prune_event(event) + + return event + + defer.returnValue([ + redact_disallowed(e, event_to_state[e.event_id]) + for e in events + ]) + @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): @@ -292,38 +346,29 @@ class FederationHandler(BaseHandler): ).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) - yield defer.gatherResults( - [ - self._handle_new_event( - dest, a, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in a.auth_events - }, - ) - for a in auth_events.values() - if a.event_id not in seen_events - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) - - yield defer.gatherResults( - [ - self._handle_new_event( - dest, event_map[e_id], - state=events_to_state[e_id], - backfilled=True, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in event_map[e_id].auth_events - }, - ) - for e_id in events_to_state - ], - consumeErrors=True - ).addErrback(unwrapFirstError) + ev_infos = [] + for a in auth_events.values(): + if a.event_id in seen_events: + continue + ev_infos.append({ + "event": a, + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + } + }) + + for e_id in events_to_state: + ev_infos.append({ + "event": event_map[e_id], + "state": events_to_state[e_id], + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + } + }) events.sort(key=lambda e: e.depth) @@ -331,10 +376,14 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue - yield self._handle_new_event( - dest, event, - backfilled=True, - ) + ev_infos.append({ + "event": event, + }) + + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) defer.returnValue(events) @@ -453,7 +502,7 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups([e]) + self.state_handler.resolve_state_groups(room_id, [e]) for e in event_ids ]) states = dict(zip(event_ids, [s[1] for s in states])) @@ -600,32 +649,22 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._handle_auth_events( - origin, [e for e in auth_chain if e.event_id != event.event_id] - ) - - @defer.inlineCallbacks - def handle_state(e): + ev_infos = [] + for e in itertools.chain(state, auth_chain): if e.event_id == event.event_id: - return + continue e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { + auth_ids = [e_id for e_id, _ in e.auth_events] + ev_infos.append({ + "event": e, + "auth_events": { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + }) - yield defer.DeferredList([handle_state(e) for e in state]) + yield self._handle_new_events(origin, ev_infos, outliers=True) auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { @@ -835,7 +874,7 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Host not in room.") state_groups = yield self.store.get_state_groups( - [event_id] + room_id, [event_id] ) if state_groups: @@ -882,6 +921,8 @@ class FederationHandler(BaseHandler): limit ) + events = yield self._filter_events_for_server(origin, room_id, events) + defer.returnValue(events) @defer.inlineCallbacks @@ -940,11 +981,54 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, origin, event, state=None, backfilled=False, current_state=None, auth_events=None): - logger.debug( - "_handle_new_event: %s, sigs: %s", - event.event_id, event.signatures, + outlier = event.internal_metadata.is_outlier() + + context = yield self._prep_event( + origin, event, + state=state, + backfilled=backfilled, + current_state=current_state, + auth_events=auth_events, + ) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=(not outlier and not backfilled), + current_state=current_state, + ) + + defer.returnValue((context, event_stream_id, max_stream_id)) + + @defer.inlineCallbacks + def _handle_new_events(self, origin, event_infos, backfilled=False, + outliers=False): + contexts = yield defer.gatherResults( + [ + self._prep_event( + origin, + ev_info["event"], + state=ev_info.get("state"), + backfilled=backfilled, + auth_events=ev_info.get("auth_events"), + ) + for ev_info in event_infos + ] + ) + + yield self.store.persist_events( + [ + (ev_info["event"], context) + for ev_info, context in itertools.izip(event_infos, contexts) + ], + backfilled=backfilled, + is_new_state=(not outliers and not backfilled), ) + @defer.inlineCallbacks + def _prep_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( @@ -954,13 +1038,6 @@ class FederationHandler(BaseHandler): if not auth_events: auth_events = context.current_state - logger.debug( - "_handle_new_event: %s, auth_events: %s", - event.event_id, auth_events, - ) - - is_new_state = not outlier - # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: @@ -984,26 +1061,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - # FIXME: Don't store as rejected with AUTH_ERROR if we haven't - # seen all the auth events. - yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=False, - current_state=current_state, - ) - raise - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=(is_new_state and not backfilled), - current_state=current_state, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, @@ -1066,14 +1124,24 @@ class FederationHandler(BaseHandler): @log_function def do_auth(self, origin, event, context, auth_events): # Check if we have all the auth events. - have_events = yield self.store.have_events( - [e_id for e_id, _ in event.auth_events] - ) - + current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(e_id for e_id, _ in event.auth_events) + + if event_auth_events - current_state: + have_events = yield self.store.have_events( + event_auth_events - current_state + ) + else: + have_events = {} + + have_events.update({ + e.event_id: "" + for e in auth_events.values() + }) + seen_events = set(have_events.keys()) - missing_auth = event_auth_events - seen_events + missing_auth = event_auth_events - seen_events - current_state if missing_auth: logger.info("Missing auth: %s", missing_auth) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6200e10775..2a99921d5f 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -44,7 +44,7 @@ class IdentityHandler(BaseHandler): http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] - trustedIdServers = ['matrix.org'] + trustedIdServers = ['matrix.org', 'vector.im'] if 'id_server' in creds: id_server = creds['id_server'] @@ -117,3 +117,28 @@ class IdentityHandler(BaseHandler): except CodeMessageException as e: data = json.loads(e.msg) defer.returnValue(data) + + @defer.inlineCallbacks + def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): + yield run_on_reactor() + http_client = SimpleHttpClient(self.hs) + + params = { + 'email': email, + 'client_secret': client_secret, + 'send_attempt': send_attempt, + } + params.update(kwargs) + + try: + data = yield http_client.post_urlencoded_get_json( + "https://%s%s" % ( + id_server, + "/_matrix/identity/api/v1/validate/email/requestToken" + ), + params + ) + defer.returnValue(data) + except CodeMessageException as e: + logger.info("Proxied requestToken failed: %r", e) + raise e diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py deleted file mode 100644 index 91d87d503d..0000000000 --- a/synapse/handlers/login.py +++ /dev/null @@ -1,83 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import BaseHandler -from synapse.api.errors import LoginError, Codes - -import bcrypt -import logging - -logger = logging.getLogger(__name__) - - -class LoginHandler(BaseHandler): - - def __init__(self, hs): - super(LoginHandler, self).__init__(hs) - self.hs = hs - - @defer.inlineCallbacks - def login(self, user, password): - """Login as the specified user with the specified password. - - Args: - user (str): The user ID. - password (str): The password. - Returns: - The newly allocated access token. - Raises: - StoreError if there was a problem storing the token. - LoginError if there was an authentication problem. - """ - # TODO do this better, it can't go in __init__ else it cyclic loops - if not hasattr(self, "reg_handler"): - self.reg_handler = self.hs.get_handlers().registration_handler - - # pull out the hash for this user if they exist - user_info = yield self.store.get_user_by_id(user_id=user) - if not user_info: - logger.warn("Attempted to login as %s but they do not exist", user) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - stored_hash = user_info["password_hash"] - if bcrypt.checkpw(password, stored_hash): - # generate an access token and store it. - token = self.reg_handler._generate_token(user) - logger.info("Adding token %s for user %s", token, user) - yield self.store.add_access_token_to_user(user, token) - defer.returnValue(token) - else: - logger.warn("Failed password login for user %s", user) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - @defer.inlineCallbacks - def set_password(self, user_id, newpassword, token_id=None): - password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt()) - - yield self.store.user_set_password_hash(user_id, password_hash) - yield self.store.user_delete_access_tokens_apart_from(user_id, token_id) - yield self.hs.get_pusherpool().remove_pushers_by_user_access_token( - user_id, token_id - ) - yield self.store.flush_user(user_id) - - @defer.inlineCallbacks - def add_threepid(self, user_id, medium, address, validated_at): - yield self.store.user_add_threepid( - user_id, medium, address, validated_at, - self.hs.get_clock().time_msec() - ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e324662f18..f12465fa2c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -113,11 +113,21 @@ class MessageHandler(BaseHandler): "room_key", next_key ) + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + events = yield self._filter_events_for_client(user_id, room_id, events) + time_now = self.clock.time_msec() chunk = { "chunk": [ - serialize_event(e, time_now, as_client_event) for e in events + serialize_event(e, time_now, as_client_event) + for e in events ], "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), @@ -126,6 +136,52 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): + event_id_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + ) + + def allowed(event, state): + if event.type == EventTypes.RoomHistoryVisibility: + return True + + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE + + return True + + defer.returnValue([ + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) + ]) + + @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, client=None, txn_id=None): """ Given a dict from a client, create and handle a new event. @@ -278,6 +334,11 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("presence"), None ) + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -316,6 +377,10 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, event.room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() @@ -336,15 +401,20 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - yield defer.gatherResults( - [handle_room(e) for e in room_list], - consumeErrors=True - ).addErrback(unwrapFirstError) + # Only do N rooms at once + n = 5 + d_list = [handle_room(e) for e in room_list] + for i in range(0, len(d_list), n): + yield defer.gatherResults( + d_list[i:i + n], + consumeErrors=True + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, "presence": presence, - "end": now_token.to_string() + "receipts": receipt, + "end": now_token.to_string(), } defer.returnValue(ret) @@ -390,24 +460,21 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - presence_defs = yield defer.DeferredList( - [ - presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - check_auth=False, - ) - for m in room_members - ], - consumeErrors=True, + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, ) - defer.returnValue([p for success, p in presence_defs if success]) + defer.returnValue(states.values()) - presence, (messages, token) = yield defer.gatherResults( + receipts_handler = self.hs.get_handlers().receipts_handler + + presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), + receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), self.store.get_recent_events_for_room( room_id, limit=limit, @@ -417,6 +484,10 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -431,5 +502,6 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), }, "state": state, - "presence": presence + "presence": presence, + "receipts": receipts, }) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7c03198313..e91e81831e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -192,6 +192,20 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_state(self, target_user, auth_user, as_event=False, check_auth=True): + """Get the current presence state of the given user. + + Args: + target_user (UserID): The user whose presence we want + auth_user (UserID): The user requesting the presence, used for + checking if said user is allowed to see the persence of the + `target_user` + as_event (bool): Format the return as an event or not? + check_auth (bool): Perform the auth checks or not? + + Returns: + dict: The presence state of the `target_user`, whose format depends + on the `as_event` argument. + """ if self.hs.is_mine(target_user): if check_auth: visible = yield self.is_presence_visible( @@ -233,6 +247,81 @@ class PresenceHandler(BaseHandler): defer.returnValue(state) @defer.inlineCallbacks + def get_states(self, target_users, auth_user, as_event=False, check_auth=True): + """A batched version of the `get_state` method that accepts a list of + `target_users` + + Args: + target_users (list): The list of UserID's whose presence we want + auth_user (UserID): The user requesting the presence, used for + checking if said user is allowed to see the persence of the + `target_users` + as_event (bool): Format the return as an event or not? + check_auth (bool): Perform the auth checks or not? + + Returns: + dict: A mapping from user -> presence_state + """ + local_users, remote_users = partitionbool( + target_users, + lambda u: self.hs.is_mine(u) + ) + + if check_auth: + for user in local_users: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=user + ) + + if not visible: + raise SynapseError(404, "Presence information not visible") + + results = {} + if local_users: + for user in local_users: + if user in self._user_cachemap: + results[user] = self._user_cachemap[user].get_state() + + local_to_user = {u.localpart: u for u in local_users} + + states = yield self.store.get_presence_states( + [u.localpart for u in local_users if u not in results] + ) + + for local_part, state in states.items(): + if state is None: + continue + res = {"presence": state["state"]} + if "status_msg" in state and state["status_msg"]: + res["status_msg"] = state["status_msg"] + results[local_to_user[local_part]] = res + + for user in remote_users: + # TODO(paul): Have remote server send us permissions set + results[user] = self._get_or_offline_usercache(user).get_state() + + for state in results.values(): + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + if as_event: + for user, state in results.items(): + content = state + content["user_id"] = user.to_string() + + if "last_active" in content: + content["last_active_ago"] = int( + self._clock.time_msec() - content.pop("last_active") + ) + + results[user] = {"type": "m.presence", "content": content} + + defer.returnValue(results) + + @defer.inlineCallbacks @log_function def set_state(self, target_user, auth_user, state): # return @@ -992,7 +1081,7 @@ class PresenceHandler(BaseHandler): room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "presence_key", self._user_cachemap_latest_serial, users_to_push, diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py new file mode 100644 index 0000000000..86c911c4bf --- /dev/null +++ b/synapse/handlers/receipts.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptsHandler(BaseHandler): + def __init__(self, hs): + super(ReceiptsHandler, self).__init__(hs) + + self.hs = hs + self.federation = hs.get_replication_layer() + self.federation.register_edu_handler( + "m.receipt", self._received_remote_receipt + ) + self.clock = self.hs.get_clock() + + self._receipt_cache = None + + @defer.inlineCallbacks + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + """Called when a client tells us a local user has read up to the given + event_id in the room. + """ + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + "data": { + "ts": int(self.clock.time_msec()), + } + } + + is_new = yield self._handle_new_receipts([receipt]) + + if is_new: + self._push_remotes([receipt]) + + @defer.inlineCallbacks + def _received_remote_receipt(self, origin, content): + """Called when we receive an EDU of type m.receipt from a remote HS. + """ + receipts = [ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": user_values["event_ids"], + "data": user_values.get("data", {}), + } + for room_id, room_values in content.items() + for receipt_type, users in room_values.items() + for user_id, user_values in users.items() + ] + + yield self._handle_new_receipts(receipts) + + @defer.inlineCallbacks + def _handle_new_receipts(self, receipts): + """Takes a list of receipts, stores them and informs the notifier. + """ + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + res = yield self.store.insert_receipt( + room_id, receipt_type, user_id, event_ids, data + ) + + if not res: + # res will be None if this read receipt is 'old' + defer.returnValue(False) + + stream_id, max_persisted_id = res + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_persisted_id, rooms=[room_id] + ) + + defer.returnValue(True) + + @defer.inlineCallbacks + def _push_remotes(self, receipts): + """Given a list of receipts, works out which remote servers should be + poked and pokes them. + """ + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + remotedomains = set() + + rm_handler = self.hs.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=None, remotedomains=remotedomains + ) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } + } + }, + }, + ) + + @defer.inlineCallbacks + def get_receipts_for_room(self, room_id, to_key): + """Gets all receipts for a room, upto the given key. + """ + result = yield self.store.get_linearized_receipts_for_room( + room_id, + to_key=to_key, + ) + + if not result: + defer.returnValue([]) + + event = { + "type": "m.receipt", + "room_id": room_id, + "content": result, + } + + defer.returnValue([event]) + + +class ReceiptEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + to_key = yield self.get_current_key() + + if from_key == to_key: + defer.returnValue(([], to_key)) + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, + from_key=from_key, + to_key=to_key, + ) + + defer.returnValue((events, to_key)) + + def get_current_key(self, direction='f'): + return self.store.get_max_receipt_stream_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + to_key = int(config.from_key) + + if config.to_key: + from_key = int(config.to_key) + else: + from_key = None + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, + from_key=from_key, + to_key=to_key, + ) + + defer.returnValue((events, to_key)) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7b68585a17..86390a3671 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -57,8 +57,8 @@ class RegistrationHandler(BaseHandler): yield self.check_user_id_is_valid(user_id) - u = yield self.store.get_user_by_id(user_id) - if u: + users = yield self.store.get_users_by_id_case_insensitive(user_id) + if users: raise SynapseError( 400, "User ID already taken.", @@ -73,7 +73,8 @@ class RegistrationHandler(BaseHandler): localpart : The local part of the user ID to register. If None, one will be randomly generated. password (str) : The password to assign to this user so they can - login again. + login again. This can be None which means they cannot login again + via a password (e.g. the user is an application service user). Returns: A tuple of (user_id, access_token). Raises: @@ -90,7 +91,7 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - token = self._generate_token(user_id) + token = self.generate_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -110,7 +111,7 @@ class RegistrationHandler(BaseHandler): user_id = user.to_string() yield self.check_user_id_is_valid(user_id) - token = self._generate_token(user_id) + token = self.generate_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -160,7 +161,7 @@ class RegistrationHandler(BaseHandler): 400, "Invalid user localpart for this application service.", errcode=Codes.EXCLUSIVE ) - token = self._generate_token(user_id) + token = self.generate_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -193,6 +194,35 @@ class RegistrationHandler(BaseHandler): logger.info("Valid captcha entered from %s", ip) @defer.inlineCallbacks + def register_saml2(self, localpart): + """ + Registers email_id as SAML2 Based Auth. + """ + if urllib.quote(localpart) != localpart: + raise SynapseError( + 400, + "User ID must only contain characters which do not" + " require URL encoding." + ) + user = UserID(localpart, self.hs.hostname) + user_id = user.to_string() + + yield self.check_user_id_is_valid(user_id) + token = self.generate_token(user_id) + try: + yield self.store.register( + user_id=user_id, + token=token, + password_hash=None + ) + yield self.distributor.fire("registered_user", user) + except Exception, e: + yield self.store.add_access_token_to_user(user_id, token) + # Ignore Registration errors + logger.exception(e) + defer.returnValue((user_id, token)) + + @defer.inlineCallbacks def register_email(self, threepidCreds): """ Registers emails with an identity server. @@ -243,7 +273,7 @@ class RegistrationHandler(BaseHandler): errcode=Codes.EXCLUSIVE ) - def _generate_token(self, user_id): + def generate_token(self, user_id): # urlsafe variant uses _ and - so use . as the separator and replace # all =s with .s so http clients don't quote =s when it is used as # query params. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4bd027d9bb..c5d1001b50 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -19,12 +19,15 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID -from synapse.api.constants import EventTypes, Membership, JoinRules +from synapse.api.constants import ( + EventTypes, Membership, JoinRules, RoomCreationPreset, +) from synapse.api.errors import StoreError, SynapseError from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event +from collections import OrderedDict import logging import string @@ -33,6 +36,19 @@ logger = logging.getLogger(__name__) class RoomCreationHandler(BaseHandler): + PRESETS_DICT = { + RoomCreationPreset.PRIVATE_CHAT: { + "join_rules": JoinRules.INVITE, + "history_visibility": "invited", + "original_invitees_have_ops": False, + }, + RoomCreationPreset.PUBLIC_CHAT: { + "join_rules": JoinRules.PUBLIC, + "history_visibility": "shared", + "original_invitees_have_ops": False, + }, + } + @defer.inlineCallbacks def create_room(self, user_id, room_id, config): """ Creates a new room. @@ -121,9 +137,25 @@ class RoomCreationHandler(BaseHandler): servers=[self.hs.hostname], ) + preset_config = config.get( + "preset", + RoomCreationPreset.PUBLIC_CHAT + if is_public + else RoomCreationPreset.PRIVATE_CHAT + ) + + raw_initial_state = config.get("initial_state", []) + + initial_state = OrderedDict() + for val in raw_initial_state: + initial_state[(val["type"], val.get("state_key", ""))] = val["content"] + user = UserID.from_string(user_id) creation_events = self._create_events_for_new_room( - user, room_id, is_public=is_public + user, room_id, + preset_config=preset_config, + invite_list=invite_list, + initial_state=initial_state, ) msg_handler = self.hs.get_handlers().message_handler @@ -170,7 +202,10 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) - def _create_events_for_new_room(self, creator, room_id, is_public=False): + def _create_events_for_new_room(self, creator, room_id, preset_config, + invite_list, initial_state): + config = RoomCreationHandler.PRESETS_DICT[preset_config] + creator_id = creator.to_string() event_keys = { @@ -203,16 +238,20 @@ class RoomCreationHandler(BaseHandler): }, ) - power_levels_event = create( - etype=EventTypes.PowerLevels, - content={ + returned_events = [creation_event, join_event] + + if (EventTypes.PowerLevels, '') not in initial_state: + power_level_content = { "users": { creator.to_string(): 100, }, "users_default": 0, "events": { - EventTypes.Name: 100, + EventTypes.Name: 50, EventTypes.PowerLevels: 100, + EventTypes.RoomHistoryVisibility: 100, + EventTypes.CanonicalAlias: 50, + EventTypes.RoomAvatar: 50, }, "events_default": 0, "state_default": 50, @@ -220,21 +259,43 @@ class RoomCreationHandler(BaseHandler): "kick": 50, "redact": 50, "invite": 0, - }, - ) + } - join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE - join_rules_event = create( - etype=EventTypes.JoinRules, - content={"join_rule": join_rule}, - ) + if config["original_invitees_have_ops"]: + for invitee in invite_list: + power_level_content["users"][invitee] = 100 - return [ - creation_event, - join_event, - power_levels_event, - join_rules_event, - ] + power_levels_event = create( + etype=EventTypes.PowerLevels, + content=power_level_content, + ) + + returned_events.append(power_levels_event) + + if (EventTypes.JoinRules, '') not in initial_state: + join_rules_event = create( + etype=EventTypes.JoinRules, + content={"join_rule": config["join_rules"]}, + ) + + returned_events.append(join_rules_event) + + if (EventTypes.RoomHistoryVisibility, '') not in initial_state: + history_event = create( + etype=EventTypes.RoomHistoryVisibility, + content={"history_visibility": config["history_visibility"]} + ) + + returned_events.append(history_event) + + for (etype, state_key), content in initial_state.items(): + returned_events.append(create( + etype=etype, + state_key=state_key, + content=content, + )) + + return returned_events class RoomMemberHandler(BaseHandler): @@ -498,15 +559,9 @@ class RoomMemberHandler(BaseHandler): """Returns a list of roomids that the user has any of the given membership states in.""" - app_service = yield self.store.get_app_service_by_user_id( - user.to_string() + rooms = yield self.store.get_rooms_for_user( + user.to_string(), ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - else: - rooms = yield self.store.get_rooms_for_user( - user.to_string(), - ) # For some reason the list of events contains duplicates # TODO(paul): work out why because I really don't think it should diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd8c603681..353a416054 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -96,9 +96,18 @@ class SyncHandler(BaseHandler): return self.current_sync_for_user(sync_config, since_token) rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user( - sync_config.user + + app_service = yield self.store.get_app_service_by_user_id( + sync_config.user.to_string() ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + room_ids = set(r.room_id for r in rooms) + else: + room_ids = yield rm_handler.get_joined_rooms_for_user( + sync_config.user + ) + result = yield self.notifier.wait_for_events( sync_config.user, room_ids, sync_config.filter, timeout, current_sync_callback @@ -229,7 +238,16 @@ class SyncHandler(BaseHandler): logger.debug("Typing %r", typing_by_room) rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(sync_config.user) + app_service = yield self.store.get_app_service_by_user_id( + sync_config.user.to_string() + ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + room_ids = set(r.room_id for r in rooms) + else: + room_ids = yield rm_handler.get_joined_rooms_for_user( + sync_config.user + ) # TODO (mjark): Does public mean "published"? published_rooms = yield self.store.get_rooms(is_public=True) @@ -293,6 +311,52 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): + event_id_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + ) + + def allowed(event, state): + if event.type == EventTypes.RoomHistoryVisibility: + return True + + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE + + return True + + defer.returnValue([ + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) + ]) + + @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None): limited = True @@ -313,6 +377,9 @@ class SyncHandler(BaseHandler): (room_key, _) = keys end_key = "s" + room_key.split('-')[-1] loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents = yield self._filter_events_for_client( + sync_config.user.to_string(), room_id, loaded_recents, + ) loaded_recents.extend(recents) recents = loaded_recents if len(events) <= load_limit: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a9895292c2..d7096aab8c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -204,21 +204,17 @@ class TypingNotificationHandler(BaseHandler): ) def _push_update_local(self, room_id, user, typing): - if room_id not in self._room_serials: - self._room_serials[room_id] = 0 - self._room_typing[room_id] = set() - - room_set = self._room_typing[room_id] + room_set = self._room_typing.setdefault(room_id, set()) if typing: room_set.add(user) - elif user in room_set: - room_set.remove(user) + else: + room_set.discard(user) self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[room_id] ) @@ -260,8 +256,8 @@ class TypingNotificationEventSource(object): ) events = [] - for room_id in handler._room_serials: - if room_id not in joined_room_ids: + for room_id in joined_room_ids: + if room_id not in handler._room_serials: continue if handler._room_serials[room_id] <= from_key: continue |