diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 17 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 19 | ||||
-rw-r--r-- | synapse/handlers/events.py | 6 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 14 | ||||
-rw-r--r-- | synapse/handlers/login.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 11 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 118 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 2 | ||||
-rw-r--r-- | synapse/handlers/register.py | 64 | ||||
-rw-r--r-- | synapse/handlers/room.py | 16 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 2 |
12 files changed, 205 insertions, 68 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index b2208b26c3..5308e2c8e1 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index b37c8be964..9989fe8670 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ # limitations under the License. from twisted.internet import defer +from synapse.api.errors import LimitExceededError class BaseHandler(object): @@ -25,8 +26,22 @@ class BaseHandler(object): self.room_lock = hs.get_room_lock_manager() self.state_handler = hs.get_state_handler() self.distributor = hs.get_distributor() + self.ratelimiter = hs.get_ratelimiter() + self.clock = hs.get_clock() self.hs = hs + def ratelimit(self, user_id): + time_now = self.clock.time() + allowed, time_allowed = self.ratelimiter.send_message( + user_id, time_now, + msg_rate_hz=self.hs.config.rc_messages_per_second, + burst_count=self.hs.config.rc_message_burst_count, + ) + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000*(time_allowed - time_now)), + ) + class BaseRoomHandler(BaseHandler): diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 7c89150d99..1b9e831fc0 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError +from synapse.http.client import HttpClient import logging @@ -36,7 +37,7 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def create_association(self, room_alias, room_id, servers): + def create_association(self, room_alias, room_id, servers=None): # TODO(erikj): Do auth. if not room_alias.is_mine: @@ -47,6 +48,12 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Check if there is a current association. + if not servers: + servers = yield self.store.get_joined_hosts_for_room(room_id) + + if not servers: + raise SynapseError(400, "Failed to get server list") + yield self.store.create_room_alias_association( room_alias, room_id, @@ -68,7 +75,10 @@ class DirectoryHandler(BaseHandler): result = yield self.federation.make_query( destination=room_alias.domain, query_type="directory", - args={"room_alias": room_alias.to_string()}, + args={ + "room_alias": room_alias.to_string(), + HttpClient.RETRY_DNS_LOOKUP_FAILURES: False + } ) if result and "room_id" in result and "servers" in result: @@ -79,6 +89,9 @@ class DirectoryHandler(BaseHandler): defer.returnValue({}) return + extra_servers = yield self.store.get_joined_hosts_for_room(room_id) + servers = list(set(extra_servers) | set(servers)) + defer.returnValue({ "room_id": room_id, "servers": servers, diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 980a169b25..fd24a11fb8 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -126,5 +126,7 @@ class EventHandler(BaseHandler): defer.returnValue(None) return - yield self.auth.check(event, raises=True) + if hasattr(event, "room_id"): + yield self.auth.check_joined_room(event.room_id, user.to_string()) + defer.returnValue(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eac110419c..59cbf71d78 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +21,9 @@ from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec +from synapse.api.errors import SynapseError -from twisted.internet import defer +from twisted.internet import defer, reactor import logging @@ -133,7 +134,7 @@ class FederationHandler(BaseHandler): yield self.hs.get_handlers().room_member_handler.change_membership( new_event, - do_auth=True + do_auth=False, ) else: @@ -231,7 +232,12 @@ class FederationHandler(BaseHandler): # TODO (erikj): Time out here. d = defer.Deferred() self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) - yield d + reactor.callLater(10, d.cancel) + + try: + yield d + except defer.CancelledError: + raise SynapseError(500, "Unable to join remote room") try: yield self.store.store_room( diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py index 0220fa0604..6ee7ce5a2d 100644 --- a/synapse/handlers/login.py +++ b/synapse/handlers/login.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4aeb2089f5..dad2bbd1a4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -76,6 +76,8 @@ class MessageHandler(BaseRoomHandler): Raises: SynapseError if something went wrong. """ + + self.ratelimit(event.user_id) # TODO(paul): Why does 'event' not have a 'user' object? user = self.hs.parse_userid(event.user_id) assert user.is_mine, "User must be our own: %s" % (user,) @@ -140,7 +142,12 @@ class MessageHandler(BaseRoomHandler): SynapseError if something went wrong. """ - snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) + snapshot = yield self.store.snapshot_room( + event.room_id, + event.user_id, + state_type=event.type, + state_key=event.state_key, + ) yield self.auth.check(event, snapshot, raises=True) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9bfceda88a..c79bb6ff76 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -155,19 +155,18 @@ class PresenceHandler(BaseHandler): if observer_user == observed_user: defer.returnValue(True) - allowed_by_subscription = yield self.store.is_presence_visible( + if (yield self.store.user_rooms_intersect( + [u.to_string() for u in observer_user, observed_user] + )): + defer.returnValue(True) + + if (yield self.store.is_presence_visible( observed_localpart=observed_user.localpart, observer_userid=observer_user.to_string(), - ) - - if allowed_by_subscription: + )): defer.returnValue(True) - share_room = yield self.store.do_users_share_a_room( - [observer_user, observed_user] - ) - - defer.returnValue(share_room) + defer.returnValue(False) @defer.inlineCallbacks def get_state(self, target_user, auth_user): @@ -181,7 +180,7 @@ class PresenceHandler(BaseHandler): state = yield self.store.get_presence_state(target_user.localpart) if "mtime" in state: del state["mtime"] - state["presence"] = state["state"] + state["presence"] = state.pop("state") if target_user in self._user_cachemap: state["last_active"] = ( @@ -208,21 +207,17 @@ class PresenceHandler(BaseHandler): raise SynapseError(400, "User is not hosted on this Home Server") if target_user != auth_user: - raise AuthError(400, "Cannot set another user's displayname") + raise AuthError(400, "Cannot set another user's presence") if "status_msg" not in state: state["status_msg"] = None for k in state.keys(): - if k not in ("presence", "state", "status_msg"): + if k not in ("presence", "status_msg"): raise SynapseError( 400, "Unexpected presence state key '%s'" % (k,) ) - # Handle legacy "state" key for now - if "state" in state: - state["presence"] = state.pop("state") - if state["presence"] not in self.STATE_LEVELS: raise SynapseError(400, "'%s' is not a valid presence state" % state["presence"] @@ -601,7 +596,7 @@ class PresenceHandler(BaseHandler): if state is None: state = yield self.store.get_presence_state(user.localpart) del state["mtime"] - state["presence"] = state["state"] + state["presence"] = state.pop("state") if user in self._user_cachemap: state["last_active"] = ( @@ -622,8 +617,6 @@ class PresenceHandler(BaseHandler): "user_id": user.to_string(), } user_state.update(**state) - if "state" in user_state and "presence" not in user_state: - user_state["presence"] = user_state["state"] yield self.federation.send_edu( destination=destination, @@ -655,21 +648,12 @@ class PresenceHandler(BaseHandler): state = dict(push) del state["user_id"] - if "presence" in state: - # all is OK - pass - elif "state" in state: - # Legacy handling - state["presence"] = state["state"] - else: + if "presence" not in state: logger.warning("Received a presence 'push' EDU from %s without" - + " either a 'presence' or 'state' key", origin + + " a 'presence' key", origin ) continue - if "state" in state: - del state["state"] - if "last_active_ago" in state: state["last_active"] = int( self.clock.time_msec() - state.pop("last_active_ago") @@ -773,15 +757,52 @@ class PresenceEventSource(object): self.hs = hs self.clock = hs.get_clock() + @defer.inlineCallbacks + def is_visible(self, observer_user, observed_user): + if observer_user == observed_user: + defer.returnValue(True) + + presence = self.hs.get_handlers().presence_handler + + if (yield presence.store.user_rooms_intersect( + [u.to_string() for u in observer_user, observed_user] + )): + defer.returnValue(True) + + if observed_user.is_mine: + pushmap = presence._local_pushmap + + defer.returnValue( + observed_user.localpart in pushmap and + observer_user in pushmap[observed_user.localpart] + ) + else: + recvmap = presence._remote_recvmap + + defer.returnValue( + observed_user in recvmap and + observer_user in recvmap[observed_user] + ) + + @defer.inlineCallbacks def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) + observer_user = user + presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial] + updates = [] + # TODO(paul): use a DeferredList ? How to limit concurrency. + for observed_user in cachemap.keys(): + if not (from_key < cachemap[observed_user].serial): + continue + + if (yield self.is_visible(observer_user, observed_user)): + updates.append((observed_user, cachemap[observed_user])) + + # TODO(paul): limit if updates: clock = self.clock @@ -789,20 +810,23 @@ class PresenceEventSource(object): latest_serial = max([x[1].serial for x in updates]) data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) + defer.returnValue((data, latest_serial)) else: - return (([], presence._user_cachemap_latest_serial)) + defer.returnValue(([], presence._user_cachemap_latest_serial)) def get_current_key(self): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial + @defer.inlineCallbacks def get_pagination_rows(self, user, pagination_config, key): # TODO (erikj): Does this make sense? Ordering? from_token = pagination_config.from_token to_token = pagination_config.to_token + observer_user = user + from_key = int(from_token.presence_key) if to_token: @@ -813,7 +837,17 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap - # TODO(paul): limit, and filter by visibility + updates = [] + # TODO(paul): use a DeferredList ? How to limit concurrency. + for observed_user in cachemap.keys(): + if not (to_key < cachemap[observed_user].serial < from_key): + continue + + if (yield self.is_visible(observer_user, observed_user)): + updates.append((observed_user, cachemap[observed_user])) + + # TODO(paul): limit + updates = [(k, cachemap[k]) for k in cachemap if to_key < cachemap[k].serial < from_key] @@ -831,13 +865,13 @@ class PresenceEventSource(object): next_token = next_token.copy_and_replace( "presence_key", earliest_serial ) - return ((data, next_token)) + defer.returnValue((data, next_token)) else: if not to_token: to_token = from_token.copy_and_replace( "presence_key", 0 ) - return (([], to_token)) + defer.returnValue(([], to_token)) class UserPresenceCache(object): @@ -851,7 +885,6 @@ class UserPresenceCache(object): def update(self, state, serial): assert("mtime_age" not in state) - assert("state" not in state) self.state.update(state) # Delete keys that are now 'None' @@ -869,11 +902,6 @@ class UserPresenceCache(object): def get_state(self): # clone it so caller can't break our cache state = dict(self.state) - - # Legacy handling - if "presence" in state: - state["state"] = state["presence"] - return state def make_event(self, user, clock): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 6799132054..023d8c0cf2 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 593c603346..bee052274f 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,9 +20,13 @@ from synapse.types import UserID from synapse.api.errors import SynapseError, RegistrationError from ._base import BaseHandler import synapse.util.stringutils as stringutils +from synapse.http.client import PlainHttpClient import base64 import bcrypt +import logging + +logger = logging.getLogger(__name__) class RegistrationHandler(BaseHandler): @@ -34,7 +38,7 @@ class RegistrationHandler(BaseHandler): self.distributor.declare("registered_user") @defer.inlineCallbacks - def register(self, localpart=None, password=None): + def register(self, localpart=None, password=None, threepidCreds=None): """Registers a new client on the server. Args: @@ -47,6 +51,20 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ + + if threepidCreds: + for c in threepidCreds: + logger.info("validating theeepidcred sid %s on id server %s", c['sid'], c['idServer']) + try: + threepid = yield self._threepid_from_creds(c) + except: + logger.err() + raise RegistrationError(400, "Couldn't validate 3pid") + + if not threepid: + raise RegistrationError(400, "Couldn't validate 3pid") + logger.info("got threepid medium %s address %s", threepid['medium'], threepid['address']) + password_hash = None if password: password_hash = bcrypt.hashpw(password, bcrypt.gensalt()) @@ -61,7 +79,6 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash) self.distributor.fire("registered_user", user) - defer.returnValue((user_id, token)) else: # autogen a random user ID attempts = 0 @@ -80,7 +97,6 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash) self.distributor.fire("registered_user", user) - defer.returnValue((user_id, token)) except SynapseError: # if user id is taken, just generate another user_id = None @@ -90,6 +106,15 @@ class RegistrationHandler(BaseHandler): raise RegistrationError( 500, "Cannot generate user ID.") + # Now we have a matrix ID, bind it to the threepids we were given + if threepidCreds: + for c in threepidCreds: + # XXX: This should be a deferred list, shouldn't it? + yield self._bind_threepid(c, user_id) + + + defer.returnValue((user_id, token)) + def _generate_token(self, user_id): # urlsafe variant uses _ and - so use . as the separator and replace # all =s with .s so http clients don't quote =s when it is used as @@ -99,3 +124,34 @@ class RegistrationHandler(BaseHandler): def _generate_user_id(self): return "-" + stringutils.random_string(18) + + @defer.inlineCallbacks + def _threepid_from_creds(self, creds): + httpCli = PlainHttpClient(self.hs) + # XXX: make this configurable! + trustedIdServers = [ 'matrix.org:8090' ] + if not creds['idServer'] in trustedIdServers: + logger.warn('%s is not a trusted ID server: rejecting 3pid credentials', creds['idServer']) + defer.returnValue(None) + data = yield httpCli.get_json( + creds['idServer'], + "/_matrix/identity/api/v1/3pid/getValidated3pid", + { 'sid': creds['sid'], 'clientSecret': creds['clientSecret'] } + ) + + if 'medium' in data: + defer.returnValue(data) + defer.returnValue(None) + + @defer.inlineCallbacks + def _bind_threepid(self, creds, mxid): + httpCli = PlainHttpClient(self.hs) + data = yield httpCli.post_urlencoded_get_json( + creds['idServer'], + "/_matrix/identity/api/v1/3pid/bind", + { 'sid': creds['sid'], 'clientSecret': creds['clientSecret'], 'mxid':mxid } + ) + defer.returnValue(data) + + + diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 53aa77405c..8171e9eb45 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ class RoomCreationHandler(BaseRoomHandler): SynapseError if the room ID was taken, couldn't be stored, or something went horribly wrong. """ + self.ratelimit(user_id) if "room_alias_name" in config: room_alias = RoomAlias.create_local( @@ -110,8 +111,6 @@ class RoomCreationHandler(BaseRoomHandler): servers=[self.hs.hostname], ) - federation_handler = self.hs.get_handlers().federation_handler - @defer.inlineCallbacks def handle_event(event): snapshot = yield self.store.snapshot_room( @@ -138,6 +137,17 @@ class RoomCreationHandler(BaseRoomHandler): ) yield handle_event(name_event) + elif room_alias: + name = room_alias.to_string() + name_event = self.event_factory.create_event( + etype=RoomNameEvent.TYPE, + room_id=room_id, + user_id=user_id, + required_power_level=5, + content={"name": name}, + ) + + yield handle_event(name_event) if "topic" in config: topic = config["topic"] diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3268427ecd..0ca4e5c31e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 matrix.org +# Copyright 2014 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. |