From 5b54d51d1e98450451b8ffe3a57ad98373e8f5e6 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 18 Oct 2016 17:04:09 +0100 Subject: Allow Configurable Rate Limiting Per AS This adds a flag loaded from the registration file of an AS that will determine whether or not its users are rate limited (by ratelimit in _base.py). Needed for IRC bridge reasons - see https://github.com/matrix-org/matrix-appservice-irc/issues/240. --- synapse/handlers/_base.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4981643166..a377b1225b 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -57,10 +57,24 @@ class BaseHandler(object): time_now = self.clock.time() user_id = requester.user.to_string() + # Disable rate limiting of users belonging to any AS that is configured + # not to be rate limited in its registration file (rate_limited: true|false). + # The AS user itself is never rate limited. + app_service = self.store.get_app_service_by_user_id(user_id) if app_service is not None: return # do not ratelimit app service senders + should_rate_limit = True + + for service in self.store.get_app_services(): + if service.is_interested_in_user(user_id): + should_rate_limit = service.is_rate_limited() + break + + if not should_rate_limit: + return + allowed, time_allowed = self.ratelimiter.send_message( user_id, time_now, msg_rate_hz=self.hs.config.rc_messages_per_second, -- cgit 1.5.1 From 550308c7a1e8e6f7bf6ee27baf2d67796591657a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Oct 2016 10:45:24 +0100 Subject: Check whether to ratelimit sooner to avoid work --- synapse/api/ratelimiting.py | 14 +++++++++----- synapse/handlers/message.py | 14 +++++++++++++- 2 files changed, 22 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 660dfb56e5..06cc8d90b8 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -23,7 +23,7 @@ class Ratelimiter(object): def __init__(self): self.message_counts = collections.OrderedDict() - def send_message(self, user_id, time_now_s, msg_rate_hz, burst_count): + def send_message(self, user_id, time_now_s, msg_rate_hz, burst_count, update=True): """Can the user send a message? Args: user_id: The user sending a message. @@ -32,12 +32,15 @@ class Ratelimiter(object): second. burst_count: How many messages the user can send before being limited. + update (bool): Whether to update the message rates or not. This is + useful to check if a message would be allowed to be sent before + its ready to be actually sent. Returns: A pair of a bool indicating if they can send a message now and a time in seconds of when they can next send a message. """ self.prune_message_counts(time_now_s) - message_count, time_start, _ignored = self.message_counts.pop( + message_count, time_start, _ignored = self.message_counts.get( user_id, (0., time_now_s, None), ) time_delta = time_now_s - time_start @@ -52,9 +55,10 @@ class Ratelimiter(object): allowed = True message_count += 1 - self.message_counts[user_id] = ( - message_count, time_start, msg_rate_hz - ) + if update: + self.message_counts[user_id] = ( + message_count, time_start, msg_rate_hz + ) if msg_rate_hz > 0: time_allowed = ( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 30ea9630f7..a94a514338 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -239,6 +239,18 @@ class MessageHandler(BaseHandler): "Tried to send member event through non-member codepath" ) + time_now = self.clock.time() + allowed, time_allowed = self.ratelimiter.send_message( + event.sender, time_now, + msg_rate_hz=self.hs.config.rc_messages_per_second, + burst_count=self.hs.config.rc_message_burst_count, + update=False, + ) + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now)), + ) + user = UserID.from_string(event.sender) assert self.hs.is_mine(user), "User must be our own: %s" % (user,) -- cgit 1.5.1 From df2a616c7b028a6eb8b50c57e7e73847287a6feb Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 19 Oct 2016 11:13:55 +0100 Subject: Convert emails to lowercase when storing And db migration sql to convert existing addresses. --- synapse/handlers/auth.py | 12 +++++++++++ synapse/storage/schema/delta/36/user_threepids.sql | 23 ++++++++++++++++++++++ .../36/user_threepids_medium_address_index.sql | 16 --------------- 3 files changed, 35 insertions(+), 16 deletions(-) create mode 100644 synapse/storage/schema/delta/36/user_threepids.sql delete mode 100644 synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dc0fe60e1b..3635521230 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -611,6 +611,18 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): + # 'Canonicalise' email addresses down to lower case. + # We've now moving towards the Home Server being the entity that + # is responsible for validating threepids used for resetting passwords + # on accounts, so in future Synapse will gain knowledge of specific + # types (mediums) of threepid. For now, we still use the existing + # infrastructure, but this is the start of synapse gaining knowledge + # of specific types of threepid (and fixes the fact that checking + # for the presenc eof an email address during password reset was + # case sensitive). + if medium == 'email': + address = address.lower() + yield self.store.user_add_threepid( user_id, medium, address, validated_at, self.hs.get_clock().time_msec() diff --git a/synapse/storage/schema/delta/36/user_threepids.sql b/synapse/storage/schema/delta/36/user_threepids.sql new file mode 100644 index 0000000000..ef8813e72a --- /dev/null +++ b/synapse/storage/schema/delta/36/user_threepids.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Update any email addresses that were stored with mixed case into all + * lowercase + */ +UPDATE user_threepids SET address = LOWER(address) where medium = 'email'; + +/* Add an index for the select we do on passwored reset */ +CREATE INDEX user_threepids_medium_address on user_threepids (medium, address); diff --git a/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql b/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql deleted file mode 100644 index 702a872784..0000000000 --- a/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE INDEX user_threepids_medium_address on user_threepids (medium, LOWER(address)); -- cgit 1.5.1 From f2f74ffce6e77234dad571b61b70e59a7534a681 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Oct 2016 14:21:28 +0100 Subject: Comment --- synapse/handlers/message.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a94a514338..59eb26beaf 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -239,6 +239,9 @@ class MessageHandler(BaseHandler): "Tried to send member event through non-member codepath" ) + # We check here if we are currently being rate limited, so that we + # don't do unnecessary work. We check again just before we actually + # send the event. time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( event.sender, time_now, -- cgit 1.5.1 From 1b17d1a106604ddf1d8b97d499db8de1dc0651b5 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Thu, 20 Oct 2016 11:43:05 +0100 Subject: Use real AS object by passing it through the requester This means synapse does not have to check if the AS is interested, but instead it effectively re-uses what it already knew about the requesting user --- synapse/api/auth.py | 14 +++++++------- synapse/handlers/_base.py | 11 +++-------- synapse/types.py | 8 +++++--- 3 files changed, 15 insertions(+), 18 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 1b3b55d517..c0bd0890fa 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -603,10 +603,10 @@ class Auth(object): """ # Can optionally look elsewhere in the request (e.g. headers) try: - user_id = yield self._get_appservice_user_id(request) + user_id, as_user = yield self._get_appservice_user_id(request) if user_id: request.authenticated_entity = user_id - defer.returnValue(synapse.types.create_requester(user_id)) + defer.returnValue(synapse.types.create_requester(user_id, as_user=as_user)) access_token = get_access_token_from_request( request, self.TOKEN_NOT_FOUND_HTTP_STATUS @@ -644,7 +644,7 @@ class Auth(object): request.authenticated_entity = user.to_string() defer.returnValue(synapse.types.create_requester( - user, token_id, is_guest, device_id)) + user, token_id, is_guest, device_id, as_user=as_user)) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.", @@ -659,14 +659,14 @@ class Auth(object): ) ) if app_service is None: - defer.returnValue(None) + defer.returnValue((None, None)) if "user_id" not in request.args: - defer.returnValue(app_service.sender) + defer.returnValue((app_service.sender, app_service)) user_id = request.args["user_id"][0] if app_service.sender == user_id: - defer.returnValue(app_service.sender) + defer.returnValue((app_service.sender, app_service)) if not app_service.is_interested_in_user(user_id): raise AuthError( @@ -678,7 +678,7 @@ class Auth(object): 403, "Application service has not registered this user" ) - defer.returnValue(user_id) + defer.returnValue((user_id, app_service)) @defer.inlineCallbacks def get_user_by_access_token(self, token, rights="access"): diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index a377b1225b..ba62746214 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -65,14 +65,9 @@ class BaseHandler(object): if app_service is not None: return # do not ratelimit app service senders - should_rate_limit = True - - for service in self.store.get_app_services(): - if service.is_interested_in_user(user_id): - should_rate_limit = service.is_rate_limited() - break - - if not should_rate_limit: + if requester.as_user and not requester.as_user.is_rate_limited(): + # do not ratelimit users of which a non-rate-limited AS is + # acting on behalf return allowed, time_allowed = self.ratelimiter.send_message( diff --git a/synapse/types.py b/synapse/types.py index 1694af1250..35e05b9c41 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -19,7 +19,7 @@ from collections import namedtuple Requester = namedtuple("Requester", - ["user", "access_token_id", "is_guest", "device_id"]) + ["user", "access_token_id", "is_guest", "device_id", "as_user"]) """ Represents the user making a request @@ -29,11 +29,12 @@ Attributes: request, or None if it came via the appservice API or similar is_guest (bool): True if the user making this request is a guest user device_id (str|None): device_id which was set at authentication time + as_user (ApplicationService|None): the AS requesting on behalf of the user """ def create_requester(user_id, access_token_id=None, is_guest=False, - device_id=None): + device_id=None, as_user=None): """ Create a new ``Requester`` object @@ -43,13 +44,14 @@ def create_requester(user_id, access_token_id=None, is_guest=False, request, or None if it came via the appservice API or similar is_guest (bool): True if the user making this request is a guest user device_id (str|None): device_id which was set at authentication time + as_user (ApplicationService|None): the AS requesting on behalf of the user Returns: Requester """ if not isinstance(user_id, UserID): user_id = UserID.from_string(user_id) - return Requester(user_id, access_token_id, is_guest, device_id) + return Requester(user_id, access_token_id, is_guest, device_id, as_user) def get_domain_from_id(string): -- cgit 1.5.1 From f09db236b1867a20ef1328af40a2422bca35944e Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Thu, 20 Oct 2016 12:04:54 +0100 Subject: as_user->app_service, less redundant comments, better positioned comments --- synapse/api/auth.py | 6 +++--- synapse/handlers/_base.py | 9 +++------ synapse/types.py | 10 +++++----- 3 files changed, 11 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5fc0150bdc..154af6728a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -603,11 +603,11 @@ class Auth(object): """ # Can optionally look elsewhere in the request (e.g. headers) try: - user_id, as_user = yield self._get_appservice_user_id(request) + user_id, app_service = yield self._get_appservice_user_id(request) if user_id: request.authenticated_entity = user_id defer.returnValue( - synapse.types.create_requester(user_id, as_user=as_user) + synapse.types.create_requester(user_id, app_service=app_service) ) access_token = get_access_token_from_request( @@ -646,7 +646,7 @@ class Auth(object): request.authenticated_entity = user.to_string() defer.returnValue(synapse.types.create_requester( - user, token_id, is_guest, device_id, as_user=as_user)) + user, token_id, is_guest, device_id, app_service=app_service)) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.", diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ba62746214..90f96209f8 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -57,17 +57,14 @@ class BaseHandler(object): time_now = self.clock.time() user_id = requester.user.to_string() - # Disable rate limiting of users belonging to any AS that is configured - # not to be rate limited in its registration file (rate_limited: true|false). # The AS user itself is never rate limited. - app_service = self.store.get_app_service_by_user_id(user_id) if app_service is not None: return # do not ratelimit app service senders - if requester.as_user and not requester.as_user.is_rate_limited(): - # do not ratelimit users of which a non-rate-limited AS is - # acting on behalf + # Disable rate limiting of users belonging to any AS that is configured + # not to be rate limited in its registration file (rate_limited: true|false). + if requester.app_service and not requester.app_service.is_rate_limited(): return allowed, time_allowed = self.ratelimiter.send_message( diff --git a/synapse/types.py b/synapse/types.py index 35e05b9c41..2b8afa9aab 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -19,7 +19,7 @@ from collections import namedtuple Requester = namedtuple("Requester", - ["user", "access_token_id", "is_guest", "device_id", "as_user"]) + ["user", "access_token_id", "is_guest", "device_id", "app_service"]) """ Represents the user making a request @@ -29,12 +29,12 @@ Attributes: request, or None if it came via the appservice API or similar is_guest (bool): True if the user making this request is a guest user device_id (str|None): device_id which was set at authentication time - as_user (ApplicationService|None): the AS requesting on behalf of the user + app_service (ApplicationService|None): the AS requesting on behalf of the user """ def create_requester(user_id, access_token_id=None, is_guest=False, - device_id=None, as_user=None): + device_id=None, app_service=None): """ Create a new ``Requester`` object @@ -44,14 +44,14 @@ def create_requester(user_id, access_token_id=None, is_guest=False, request, or None if it came via the appservice API or similar is_guest (bool): True if the user making this request is a guest user device_id (str|None): device_id which was set at authentication time - as_user (ApplicationService|None): the AS requesting on behalf of the user + app_service (ApplicationService|None): the AS requesting on behalf of the user Returns: Requester """ if not isinstance(user_id, UserID): user_id = UserID.from_string(user_id) - return Requester(user_id, access_token_id, is_guest, device_id, as_user) + return Requester(user_id, access_token_id, is_guest, device_id, app_service) def get_domain_from_id(string): -- cgit 1.5.1 From d04e2ff3a43cca3f7d393a4770f022c7bf1a372c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Oct 2016 13:35:51 +0100 Subject: Fix incredubly slow back pagination query If a client didn't specify a from token when paginating backwards synapse would attempt to query the (global) maximum topological token. This a) doesn't make much sense since they're room specific and b) there are no indices that lets postgres do this efficiently. --- synapse/handlers/message.py | 4 ++-- synapse/handlers/room.py | 7 +++++-- synapse/storage/stream.py | 19 +++++++++++++------ synapse/streams/events.py | 30 ++++++++++++++++++++++++++++-- 4 files changed, 48 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 59eb26beaf..abfa8c65a4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -82,8 +82,8 @@ class MessageHandler(BaseHandler): room_token = pagin_config.from_token.room_key else: pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token( - direction='b' + yield self.hs.get_event_sources().get_current_token_for_room( + room_id=room_id ) ) room_token = pagin_config.from_token.room_key diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a7f533f7be..59e4d1cd15 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -475,8 +475,11 @@ class RoomEventSource(object): defer.returnValue((events, end_key)) - def get_current_key(self, direction='f'): - return self.store.get_room_events_max_id(direction) + def get_current_key(self): + return self.store.get_room_events_max_id() + + def get_current_key_for_room(self, room_id): + return self.store.get_room_events_max_id(room_id) @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 07ea969d4d..888b1cb35d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -521,13 +521,20 @@ class StreamStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_room_events_max_id(self, direction='f'): + def get_room_events_max_id(self, room_id=None): + """Returns the current token for rooms stream. + + By default, it returns the current global stream token. Specifying a + `room_id` causes it to return the current room specific topological + token. + """ token = yield self._stream_id_gen.get_current_token() - if direction != 'b': + if room_id is None: defer.returnValue("s%d" % (token,)) else: topo = yield self.runInteraction( - "_get_max_topological_txn", self._get_max_topological_txn + "_get_max_topological_txn", self._get_max_topological_txn, + room_id, ) defer.returnValue("t%d-%d" % (topo, token)) @@ -579,11 +586,11 @@ class StreamStore(SQLBaseStore): lambda r: r[0][0] if r else 0 ) - def _get_max_topological_txn(self, txn): + def _get_max_topological_txn(self, txn, room_id): txn.execute( "SELECT MAX(topological_ordering) FROM events" - " WHERE outlier = ?", - (False,) + " WHERE room_id = ?", + (room_id,) ) rows = txn.fetchall() diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 6bf21d6f5e..4018dbde56 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -41,13 +41,39 @@ class EventSources(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_current_token(self, direction='f'): + def get_current_token(self): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() token = StreamToken( room_key=( - yield self.sources["room"].get_current_key(direction) + yield self.sources["room"].get_current_key() + ), + presence_key=( + yield self.sources["presence"].get_current_key() + ), + typing_key=( + yield self.sources["typing"].get_current_key() + ), + receipt_key=( + yield self.sources["receipt"].get_current_key() + ), + account_data_key=( + yield self.sources["account_data"].get_current_key() + ), + push_rules_key=push_rules_key, + to_device_key=to_device_key, + ) + defer.returnValue(token) + + @defer.inlineCallbacks + def get_current_token_for_room(self, room_id): + push_rules_key, _ = self.store.get_push_rules_stream_token() + to_device_key = self.store.get_to_device_stream_token() + + token = StreamToken( + room_key=( + yield self.sources["room"].get_current_key() ), presence_key=( yield self.sources["presence"].get_current_key() -- cgit 1.5.1 From 2ef617bc06f59e1cb872b8b0f3870e2130071c76 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Oct 2016 15:51:22 +0100 Subject: Fix infinite typing bug There's a bug somewhere that causes typing notifications to not be timed out properly. By adding a paranoia timer and using correct inequalities notifications should stop being stuck, even if it the root cause hasn't been fixed. --- synapse/handlers/typing.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 08313417b2..27ee715ff0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -88,7 +88,7 @@ class TypingHandler(object): continue until = self._member_typing_until.get(member, None) - if not until or until < now: + if not until or until <= now: logger.info("Timing out typing for: %s", member.user_id) preserve_fn(self._stopped_typing)(member) continue @@ -97,12 +97,20 @@ class TypingHandler(object): # user. if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) - if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL < now: + if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: preserve_fn(self._push_remote)( member=member, typing=True ) + # Add a paranoia timer to ensure that we always have a timer for + # each person typing. + self.wheel_timer.insert( + now=now, + obj=member, + then=now + 60 * 1000, + ) + def is_typing(self, member): return member.user_id in self._room_typing.get(member.room_id, []) -- cgit 1.5.1