diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7a0f54ca24..54a71c49d2 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -102,8 +102,9 @@ class AuthHandler(BaseHandler):
login_types.append(t)
self._supported_login_types = login_types
- self._account_ratelimiter = Ratelimiter()
- self._failed_attempts_ratelimiter = Ratelimiter()
+ # Ratelimiter for failed auth during UIA. Uses same ratelimit config
+ # as per `rc_login.failed_attempts`.
+ self._failed_uia_attempts_ratelimiter = Ratelimiter()
self._clock = self.hs.get_clock()
@@ -133,12 +134,38 @@ class AuthHandler(BaseHandler):
AuthError if the client has completed a login flow, and it gives
a different user to `requester`
+
+ LimitExceededError if the ratelimiter's failed request count for this
+ user is too high to proceed
+
"""
+ user_id = requester.user.to_string()
+
+ # Check if we should be ratelimited due to too many previous failed attempts
+ self._failed_uia_attempts_ratelimiter.ratelimit(
+ user_id,
+ time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=False,
+ )
+
# build a list of supported flows
flows = [[login_type] for login_type in self._supported_login_types]
- result, params, _ = yield self.check_auth(flows, request_body, clientip)
+ try:
+ result, params, _ = yield self.check_auth(flows, request_body, clientip)
+ except LoginError:
+ # Update the ratelimite to say we failed (`can_do_action` doesn't raise).
+ self._failed_uia_attempts_ratelimiter.can_do_action(
+ user_id,
+ time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=True,
+ )
+ raise
# find the completed login type
for login_type in self._supported_login_types:
@@ -223,7 +250,7 @@ class AuthHandler(BaseHandler):
# could continue registration from your phone having clicked the
# email auth link on there). It's probably too open to abuse
# because it lets unauthenticated clients store arbitrary objects
- # on a home server.
+ # on a homeserver.
# Revisit: Assumimg the REST APIs do sensible validation, the data
# isn't arbintrary.
session["clientdict"] = clientdict
@@ -501,11 +528,8 @@ class AuthHandler(BaseHandler):
multiple matches
Raises:
- LimitExceededError if the ratelimiter's login requests count for this
- user is too high too proceed.
UserDeactivatedError if a user is found but is deactivated.
"""
- self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None:
return res[0]
@@ -572,8 +596,6 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request
LoginError if there was an authentication problem.
- LimitExceededError if the ratelimiter's login requests count for this
- user is too high too proceed.
"""
if username.startswith("@"):
@@ -581,8 +603,6 @@ class AuthHandler(BaseHandler):
else:
qualified_user_id = UserID(username, self.hs.hostname).to_string()
- self.ratelimit_login_per_account(qualified_user_id)
-
login_type = login_submission.get("type")
known_login_type = False
@@ -650,15 +670,6 @@ class AuthHandler(BaseHandler):
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
- # unknown username or invalid password.
- self._failed_attempts_ratelimiter.ratelimit(
- qualified_user_id.lower(),
- time_now_s=self._clock.time(),
- rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
- burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
- update=True,
- )
-
# We raise a 403 here, but note that if we're doing user-interactive
# login, it turns all LoginErrors into a 401 anyway.
raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
@@ -710,10 +721,6 @@ class AuthHandler(BaseHandler):
Returns:
Deferred[unicode] the canonical_user_id, or Deferred[None] if
unknown user/bad password
-
- Raises:
- LimitExceededError if the ratelimiter's login requests count for this
- user is too high too proceed.
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -742,7 +749,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
- self.ratelimit_login_per_account(user_id)
+
yield self.auth.check_auth_blocking(user_id)
return user_id
@@ -810,7 +817,7 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def add_threepid(self, user_id, medium, address, validated_at):
# 'Canonicalise' email addresses down to lower case.
- # We've now moving towards the Home Server being the entity that
+ # We've now moving towards the homeserver being the entity that
# is responsible for validating threepids used for resetting passwords
# on accounts, so in future Synapse will gain knowledge of specific
# types (mediums) of threepid. For now, we still use the existing
@@ -912,35 +919,6 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
- def ratelimit_login_per_account(self, user_id):
- """Checks whether the process must be stopped because of ratelimiting.
-
- Checks against two ratelimiters: the generic one for login attempts per
- account and the one specific to failed attempts.
-
- Args:
- user_id (unicode): complete @user:id
-
- Raises:
- LimitExceededError if one of the ratelimiters' login requests count
- for this user is too high too proceed.
- """
- self._failed_attempts_ratelimiter.ratelimit(
- user_id.lower(),
- time_now_s=self._clock.time(),
- rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
- burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
- update=False,
- )
-
- self._account_ratelimiter.ratelimit(
- user_id.lower(),
- time_now_s=self._clock.time(),
- rate_hz=self.hs.config.rc_login_account.per_second,
- burst_count=self.hs.config.rc_login_account.burst_count,
- update=True,
- )
-
@attr.s
class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 63267a0a4c..6dedaaff8d 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -95,6 +95,9 @@ class DeactivateAccountHandler(BaseHandler):
user_id, threepid["medium"], threepid["address"]
)
+ # Remove all 3PIDs this user has bound to the homeserver
+ yield self.store.user_delete_threepids(user_id)
+
# delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c4632f8984..a07d2f1a17 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
- "This application service has not reserved" " this kind of alias.",
+ "This application service has not reserved this kind of alias.",
errcode=Codes.EXCLUSIVE,
)
else:
@@ -283,7 +283,7 @@ class DirectoryHandler(BaseHandler):
def on_directory_query(self, args):
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
- raise SynapseError(400, "Room Alias is not hosted on this Home Server")
+ raise SynapseError(400, "Room Alias is not hosted on this homeserver")
result = yield self.get_association_from_room_alias(room_alias)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
+ self._is_master = hs.config.worker_app is None
+ if not self._is_master:
+ self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+ hs
+ )
+
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
- user_devices = yield self.device_handler.device_list_updater.user_device_resync(
- user_id
- )
+ if self._is_master:
+ user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+ user_id
+ )
+ else:
+ user_devices = yield self._user_device_resync_client(
+ user_id=user_id
+ )
+
user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 0cea445f0d..f1b4424a02 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
rooms
session_id(string): session ID to delete keys for, for None to delete keys
for all sessions
+ Raises:
+ NotFoundError: if the backup version does not exist
Returns:
- A deferred of the deletion transaction
+ A dict containing the count and etag for the backup version
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
+ # make sure the backup version exists
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(
+ user_id, version
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+ version_etag = version_info["etag"] + 1
+ yield self.store.update_e2e_room_keys_version(
+ user_id, version, None, version_etag
+ )
+
+ count = yield self.store.count_e2e_room_keys(user_id, version)
+ return {"etag": str(version_etag), "count": count}
+
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
}
}
+ Returns:
+ A dict containing the count and etag for the backup version
+
Raises:
NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
else:
raise
- # go through the room_keys.
- # XXX: this should/could be done concurrently, given we're in a lock.
+ # Fetch any existing room keys for the sessions that have been
+ # submitted. Then compare them with the submitted keys. If the
+ # key is new, insert it; if the key should be updated, then update
+ # it; otherwise, drop it.
+ existing_keys = yield self.store.get_e2e_room_keys_multi(
+ user_id, version, room_keys["rooms"]
+ )
+ to_insert = [] # batch the inserts together
+ changed = False # if anything has changed, we need to update the etag
for room_id, room in iteritems(room_keys["rooms"]):
- for session_id, session in iteritems(room["sessions"]):
- yield self._upload_room_key(
- user_id, version, room_id, session_id, session
+ for session_id, room_key in iteritems(room["sessions"]):
+ log_kv(
+ {
+ "message": "Trying to upload room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "user_id": user_id,
+ }
)
-
- @defer.inlineCallbacks
- def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
- """Upload a given room_key for a given room and session into a given
- version of the backup. Merges the key with any which might already exist.
-
- Args:
- user_id(str): the user whose backup we're setting
- version(str): the version ID of the backup we're updating
- room_id(str): the ID of the room whose keys we're setting
- session_id(str): the session whose room_key we're setting
- room_key(dict): the room_key being set
- """
- log_kv(
- {
- "message": "Trying to upload room key",
- "room_id": room_id,
- "session_id": session_id,
- "user_id": user_id,
- }
- )
- # get the room_key for this particular row
- current_room_key = None
- try:
- current_room_key = yield self.store.get_e2e_room_key(
- user_id, version, room_id, session_id
- )
- except StoreError as e:
- if e.code == 404:
- log_kv(
- {
- "message": "Room key not found.",
- "room_id": room_id,
- "user_id": user_id,
- }
+ current_room_key = existing_keys.get(room_id, {}).get(session_id)
+ if current_room_key:
+ if self._should_replace_room_key(current_room_key, room_key):
+ log_kv({"message": "Replacing room key."})
+ # updates are done one at a time in the DB, so send
+ # updates right away rather than batching them up,
+ # like we do with the inserts
+ yield self.store.update_e2e_room_key(
+ user_id, version, room_id, session_id, room_key
+ )
+ changed = True
+ else:
+ log_kv({"message": "Not replacing room_key."})
+ else:
+ log_kv(
+ {
+ "message": "Room key not found.",
+ "room_id": room_id,
+ "user_id": user_id,
+ }
+ )
+ log_kv({"message": "Replacing room key."})
+ to_insert.append((room_id, session_id, room_key))
+ changed = True
+
+ if len(to_insert):
+ yield self.store.add_e2e_room_keys(user_id, version, to_insert)
+
+ version_etag = version_info["etag"]
+ if changed:
+ version_etag = version_etag + 1
+ yield self.store.update_e2e_room_keys_version(
+ user_id, version, None, version_etag
)
- else:
- raise
- if self._should_replace_room_key(current_room_key, room_key):
- log_kv({"message": "Replacing room key."})
- yield self.store.set_e2e_room_key(
- user_id, version, room_id, session_id, room_key
- )
- else:
- log_kv({"message": "Not replacing room_key."})
+ count = yield self.store.count_e2e_room_keys(user_id, version)
+ return {"etag": str(version_etag), "count": count}
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
raise NotFoundError("Unknown backup version")
else:
raise
+
+ res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
return res
@trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8cafcfdab0..d3267734f7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -97,9 +97,9 @@ class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
a) handling received Pdus before handing them on as Events to the rest
- of the home server (including auth and state conflict resoultion)
+ of the homeserver (including auth and state conflict resoultion)
b) converting events that were produced by local clients that may need
- to be sent to remote home servers.
+ to be sent to remote homeservers.
c) doing the necessary dances to invite remote users and join remote
rooms.
"""
@@ -1428,9 +1428,9 @@ class FederationHandler(BaseHandler):
return event
@defer.inlineCallbacks
- def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+ def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
origin, event, event_format_version = yield self._make_and_verify_event(
- target_hosts, room_id, user_id, "leave"
+ target_hosts, room_id, user_id, "leave", content=content,
)
# Mark as outlier as we don't have any state for this event; we're not
# even in the room.
@@ -1688,7 +1688,11 @@ class FederationHandler(BaseHandler):
# hack around with a try/finally instead.
success = False
try:
- if not event.internal_metadata.is_outlier() and not backfilled:
+ if (
+ not event.internal_metadata.is_outlier()
+ and not backfilled
+ and not context.rejected
+ ):
yield self.action_generator.handle_push_actions_for_event(
event, context
)
@@ -2036,8 +2040,10 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
- What we expect the event's auth_events to be, based on the event's
- position in the dag. I think? maybe??
+ Normally, our calculated auth_events based on the state of the room
+ at the event's position in the DAG, though occasionally (eg if the
+ event is an outlier), may be the auth events claimed by the remote
+ server.
Also NB that this function adds entries to it.
Returns:
@@ -2087,30 +2093,35 @@ class FederationHandler(BaseHandler):
origin (str):
event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext):
+
auth_events (dict[(str, str)->synapse.events.EventBase]):
+ Map from (event_type, state_key) to event
+
+ Normally, our calculated auth_events based on the state of the room
+ at the event's position in the DAG, though occasionally (eg if the
+ event is an outlier), may be the auth events claimed by the remote
+ server.
+
+ Also NB that this function adds entries to it.
Returns:
defer.Deferred[EventContext]: updated context
"""
event_auth_events = set(event.auth_event_ids())
- if event.is_state():
- event_key = (event.type, event.state_key)
- else:
- event_key = None
-
- # if the event's auth_events refers to events which are not in our
- # calculated auth_events, we need to fetch those events from somewhere.
- #
- # we start by fetching them from the store, and then try calling /event_auth/.
+ # missing_auth is the set of the event's auth_events which we don't yet have
+ # in auth_events.
missing_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
+ # if we have missing events, we need to fetch those events from somewhere.
+ #
+ # we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
- logger.debug("Got events %s from store", have_events)
+ logger.debug("Found events %s in the store", have_events)
missing_auth.difference_update(have_events.keys())
else:
have_events = {}
@@ -2165,15 +2176,17 @@ class FederationHandler(BaseHandler):
event.auth_event_ids()
)
except Exception:
- # FIXME:
logger.exception("Failed to get auth chain")
if event.internal_metadata.is_outlier():
+ # XXX: given that, for an outlier, we'll be working with the
+ # event's *claimed* auth events rather than those we calculated:
+ # (a) is there any point in this test, since different_auth below will
+ # obviously be empty
+ # (b) alternatively, why don't we do it earlier?
logger.info("Skipping auth_event fetch for outlier")
return context
- # FIXME: Assumes we have and stored all the state for all the
- # prev_events
different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
@@ -2187,27 +2200,22 @@ class FederationHandler(BaseHandler):
different_auth,
)
+ # now we state-resolve between our own idea of the auth events, and the remote's
+ # idea of them.
+
room_version = yield self.store.get_room_version(event.room_id)
+ different_event_ids = [
+ d for d in different_auth if d in have_events and not have_events[d]
+ ]
- different_events = yield make_deferred_yieldable(
- defer.gatherResults(
- [
- run_in_background(
- self.store.get_event, d, allow_none=True, allow_rejected=False
- )
- for d in different_auth
- if d in have_events and not have_events[d]
- ],
- consumeErrors=True,
- )
- ).addErrback(unwrapFirstError)
+ if different_event_ids:
+ # XXX: currently this checks for redactions but I'm not convinced that is
+ # necessary?
+ different_events = yield self.store.get_events_as_list(different_event_ids)
- if different_events:
local_view = dict(auth_events)
remote_view = dict(auth_events)
- remote_view.update(
- {(d.type, d.state_key): d for d in different_events if d}
- )
+ remote_view.update({(d.type, d.state_key): d for d in different_events})
new_state = yield self.state_handler.resolve_events(
room_version,
@@ -2227,13 +2235,13 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
context = yield self._update_context_for_auth_events(
- event, context, auth_events, event_key
+ event, context, auth_events
)
return context
@defer.inlineCallbacks
- def _update_context_for_auth_events(self, event, context, auth_events, event_key):
+ def _update_context_for_auth_events(self, event, context, auth_events):
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.
@@ -2242,18 +2250,21 @@ class FederationHandler(BaseHandler):
context (synapse.events.snapshot.EventContext): initial event context
- auth_events (dict[(str, str)->str]): Events to update in the event
+ auth_events (dict[(str, str)->EventBase]): Events to update in the event
context.
- event_key ((str, str)): (type, state_key) for the current event.
- this will not be included in the current_state in the context.
-
Returns:
Deferred[EventContext]: new event context
"""
+ # exclude the state key of the new event from the current_state in the context.
+ if event.is_state():
+ event_key = (event.type, event.state_key)
+ else:
+ event_key = None
state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
}
+
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids)
@@ -2276,6 +2287,7 @@ class FederationHandler(BaseHandler):
return EventContext.with_state(
state_group=state_group,
+ state_group_before_event=context.state_group_before_event,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
prev_group=prev_group,
@@ -2454,7 +2466,7 @@ class FederationHandler(BaseHandler):
room_version, event_dict, event, context
)
- EventValidator().validate_new(event)
+ EventValidator().validate_new(event, self.config)
# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
@@ -2569,7 +2581,7 @@ class FederationHandler(BaseHandler):
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
- EventValidator().validate_new(event)
+ EventValidator().validate_new(event, self.config)
return (event, context)
@defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d682dc2b7a..3b0156f516 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -138,7 +138,7 @@ class MessageHandler(object):
raise NotFoundError("Can't find event for token %s" % (at_token,))
visible_events = yield filter_events_for_client(
- self.storage, user_id, last_events
+ self.storage, user_id, last_events, apply_retention_policies=False
)
event = last_events[0]
@@ -417,7 +417,7 @@ class EventCreationHandler(object):
403, "You must be in the room to create an alias for it"
)
- self.validator.validate_new(event)
+ self.validator.validate_new(event, self.config)
return (event, context)
@@ -634,7 +634,7 @@ class EventCreationHandler(object):
if requester:
context.app_service = requester.app_service
- self.validator.validate_new(event)
+ self.validator.validate_new(event, self.config)
# If this event is an annotation then we check that that the sender
# can't annotate the same way twice (e.g. stops users from liking an
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 97f15a1c32..8514ddc600 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,12 +15,15 @@
# limitations under the License.
import logging
+from six import iteritems
+
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
@@ -80,6 +83,109 @@ class PaginationHandler(object):
self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer()
+ self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
+
+ if hs.config.retention_enabled:
+ # Run the purge jobs described in the configuration file.
+ for job in hs.config.retention_purge_jobs:
+ self.clock.looping_call(
+ run_as_background_process,
+ job["interval"],
+ "purge_history_for_rooms_in_range",
+ self.purge_history_for_rooms_in_range,
+ job["shortest_max_lifetime"],
+ job["longest_max_lifetime"],
+ )
+
+ @defer.inlineCallbacks
+ def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+ """Purge outdated events from rooms within the given retention range.
+
+ If a default retention policy is defined in the server's configuration and its
+ 'max_lifetime' is within this range, also targets rooms which don't have a
+ retention policy.
+
+ Args:
+ min_ms (int|None): Duration in milliseconds that define the lower limit of
+ the range to handle (exclusive). If None, it means that the range has no
+ lower limit.
+ max_ms (int|None): Duration in milliseconds that define the upper limit of
+ the range to handle (inclusive). If None, it means that the range has no
+ upper limit.
+ """
+ # We want the storage layer to to include rooms with no retention policy in its
+ # return value only if a default retention policy is defined in the server's
+ # configuration and that policy's 'max_lifetime' is either lower (or equal) than
+ # max_ms or higher than min_ms (or both).
+ if self._retention_default_max_lifetime is not None:
+ include_null = True
+
+ if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
+ # The default max_lifetime is lower than (or equal to) min_ms.
+ include_null = False
+
+ if max_ms is not None and max_ms < self._retention_default_max_lifetime:
+ # The default max_lifetime is higher than max_ms.
+ include_null = False
+ else:
+ include_null = False
+
+ rooms = yield self.store.get_rooms_for_retention_period_in_range(
+ min_ms, max_ms, include_null
+ )
+
+ for room_id, retention_policy in iteritems(rooms):
+ if room_id in self._purges_in_progress_by_room:
+ logger.warning(
+ "[purge] not purging room %s as there's an ongoing purge running"
+ " for this room",
+ room_id,
+ )
+ continue
+
+ max_lifetime = retention_policy["max_lifetime"]
+
+ if max_lifetime is None:
+ # If max_lifetime is None, it means that include_null equals True,
+ # therefore we can safely assume that there is a default policy defined
+ # in the server's configuration.
+ max_lifetime = self._retention_default_max_lifetime
+
+ # Figure out what token we should start purging at.
+ ts = self.clock.time_msec() - max_lifetime
+
+ stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+
+ r = yield self.store.get_room_event_after_stream_ordering(
+ room_id, stream_ordering,
+ )
+ if not r:
+ logger.warning(
+ "[purge] purging events not possible: No event found "
+ "(ts %i => stream_ordering %i)",
+ ts,
+ stream_ordering,
+ )
+ continue
+
+ (stream, topo, _event_id) = r
+ token = "t%d-%d" % (topo, stream)
+
+ purge_id = random_string(16)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+
+ logger.info(
+ "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
+ )
+
+ # We want to purge everything, including local events, and to run the purge in
+ # the background so that it's not blocking any other operation apart from
+ # other purges in the same room.
+ run_as_background_process(
+ "_purge_history", self._purge_history, purge_id, room_id, token, True,
+ )
+
def start_purge_history(self, room_id, token, delete_local_events=False):
"""Start off a history purge on a room.
@@ -127,7 +233,9 @@ class PaginationHandler(object):
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(room_id, token, delete_local_events)
+ yield self.storage.purge_events.purge_history(
+ room_id, token, delete_local_events
+ )
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
@@ -170,7 +278,7 @@ class PaginationHandler(object):
if joined:
raise SynapseError(400, "Users are still joined to this room")
- await self.store.purge_room(room_id)
+ await self.storage.purge_events.purge_room(room_id)
@defer.inlineCallbacks
def get_messages(
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 22e0a04da4..1e5a4613c9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -152,7 +152,7 @@ class BaseProfileHandler(BaseHandler):
by_admin (bool): Whether this change was made by an administrator.
"""
if not self.hs.is_mine(target_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ raise SynapseError(400, "User is not hosted on this homeserver")
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
@@ -207,7 +207,7 @@ class BaseProfileHandler(BaseHandler):
"""target_user is the user whose avatar_url is to be changed;
auth_user is the user attempting to make this change."""
if not self.hs.is_mine(target_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ raise SynapseError(400, "User is not hosted on this homeserver")
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
@@ -231,7 +231,7 @@ class BaseProfileHandler(BaseHandler):
def on_profile_query(self, args):
user = UserID.from_string(args["user_id"])
if not self.hs.is_mine(user):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ raise SynapseError(400, "User is not hosted on this homeserver")
just_field = args.get("field", None)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index cff6b0d375..95806af41e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,6 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
- LimitExceededError,
RegistrationError,
SynapseError,
)
@@ -168,6 +167,7 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
+ yield self.check_registration_ratelimit(address)
yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None
@@ -217,8 +217,13 @@ class RegistrationHandler(BaseHandler):
else:
# autogen a sequential user ID
+ fail_count = 0
user = None
while not user:
+ # Fail after being unable to find a suitable ID a few times
+ if fail_count > 10:
+ raise SynapseError(500, "Unable to find a suitable guest user ID")
+
localpart = yield self._generate_user_id()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -233,10 +238,14 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=default_display_name,
address=address,
)
+
+ # Successfully registered
+ break
except SynapseError:
# if user id is taken, just generate another
user = None
user_id = None
+ fail_count += 1
if not self.hs.config.user_consent_at_registration:
yield self._auto_join_rooms(user_id)
@@ -414,6 +423,29 @@ class RegistrationHandler(BaseHandler):
ratelimit=False,
)
+ def check_registration_ratelimit(self, address):
+ """A simple helper method to check whether the registration rate limit has been hit
+ for a given IP address
+
+ Args:
+ address (str|None): the IP address used to perform the registration. If this is
+ None, no ratelimiting will be performed.
+
+ Raises:
+ LimitExceededError: If the rate limit has been exceeded.
+ """
+ if not address:
+ return
+
+ time_now = self.clock.time()
+
+ self.ratelimiter.ratelimit(
+ address,
+ time_now_s=time_now,
+ rate_hz=self.hs.config.rc_registration.per_second,
+ burst_count=self.hs.config.rc_registration.burst_count,
+ )
+
def register_with_store(
self,
user_id,
@@ -446,22 +478,6 @@ class RegistrationHandler(BaseHandler):
Returns:
Deferred
"""
- # Don't rate limit for app services
- if appservice_id is None and address is not None:
- time_now = self.clock.time()
-
- allowed, time_allowed = self.ratelimiter.can_do_action(
- address,
- time_now_s=time_now,
- rate_hz=self.hs.config.rc_registration.per_second,
- burst_count=self.hs.config.rc_registration.burst_count,
- )
-
- if not allowed:
- raise LimitExceededError(
- retry_after_ms=int(1000 * (time_allowed - time_now))
- )
-
if self.hs.config.worker_app:
return self._register_client(
user_id=user_id,
@@ -614,7 +630,7 @@ class RegistrationHandler(BaseHandler):
# And we add an email pusher for them by default, but only
# if email notifications are enabled (so people don't start
# getting mail spam where they weren't before if email
- # notifs are set up on a home server)
+ # notifs are set up on a homeserver)
if (
self.hs.config.email_enable_notifs
and self.hs.config.email_notif_for_new_users
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index fd3ea8daf8..22768e97ff 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -199,21 +199,21 @@ class RoomCreationHandler(BaseHandler):
# finally, shut down the PLs in the old room, and update them in the new
# room.
yield self._update_upgraded_room_pls(
- requester, old_room_id, new_room_id, old_room_state
+ requester, old_room_id, new_room_id, old_room_state,
)
return new_room_id
@defer.inlineCallbacks
def _update_upgraded_room_pls(
- self, requester, old_room_id, new_room_id, old_room_state
+ self, requester, old_room_id, new_room_id, old_room_state,
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester (synapse.types.Requester): the user requesting the upgrade
- old_room_id (unicode): the id of the room to be replaced
- new_room_id (unicode): the id of the replacement room
+ old_room_id (str): the id of the room to be replaced
+ new_room_id (str): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
Returns:
@@ -299,7 +299,7 @@ class RoomCreationHandler(BaseHandler):
tombstone_event_id (unicode|str): the ID of the tombstone event in the old
room.
Returns:
- Deferred[None]
+ Deferred
"""
user_id = requester.user.to_string()
@@ -334,6 +334,7 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
+ (EventTypes.PowerLevels, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -347,6 +348,31 @@ class RoomCreationHandler(BaseHandler):
if old_event:
initial_state[k] = old_event.content
+ # Resolve the minimum power level required to send any state event
+ # We will give the upgrading user this power level temporarily (if necessary) such that
+ # they are able to copy all of the state events over, then revert them back to their
+ # original power level afterwards in _update_upgraded_room_pls
+
+ # Copy over user power levels now as this will not be possible with >100PL users once
+ # the room has been created
+
+ power_levels = initial_state[(EventTypes.PowerLevels, "")]
+
+ # Calculate the minimum power level needed to clone the room
+ event_power_levels = power_levels.get("events", {})
+ state_default = power_levels.get("state_default", 0)
+ ban = power_levels.get("ban")
+ needed_power_level = max(state_default, ban, max(event_power_levels.values()))
+
+ # Raise the requester's power level in the new room if necessary
+ current_power_level = power_levels["users"][requester.user.to_string()]
+ if current_power_level < needed_power_level:
+ # Assign this power level to the requester
+ power_levels["users"][requester.user.to_string()] = needed_power_level
+
+ # Set the power levels to the modified state
+ initial_state[(EventTypes.PowerLevels, "")] = power_levels
+
yield self._send_events_for_new_room(
requester,
new_room_id,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 06d09c2947..7b7270fc61 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -94,7 +94,9 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+ def _remote_reject_invite(
+ self, requester, remote_room_hosts, room_id, target, content
+ ):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
@@ -104,6 +106,7 @@ class RoomMemberHandler(object):
reject invite
room_id (str)
target (UserID): The user rejecting the invite
+ content (dict): The content for the rejection event
Returns:
Deferred[dict]: A dictionary to be returned to the client, may
@@ -471,7 +474,7 @@ class RoomMemberHandler(object):
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
res = yield self._remote_reject_invite(
- requester, remote_room_hosts, room_id, target
+ requester, remote_room_hosts, room_id, target, content,
)
return res
@@ -515,6 +518,15 @@ class RoomMemberHandler(object):
yield self.store.set_room_is_public(old_room_id, False)
yield self.store.set_room_is_public(room_id, True)
+ # Check if any groups we own contain the predecessor room
+ local_group_ids = yield self.store.get_local_groups_for_room(old_room_id)
+ for group_id in local_group_ids:
+ # Add new the new room to those groups
+ yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
+
+ # Remove the old room from those groups
+ yield self.store.remove_room_from_group(group_id, old_room_id)
+
@defer.inlineCallbacks
def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
"""Copy user-specific information when they join a new room when that new room is the
@@ -962,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
@defer.inlineCallbacks
- def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+ def _remote_reject_invite(
+ self, requester, remote_room_hosts, room_id, target, content
+ ):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, target.to_string()
+ remote_room_hosts, room_id, target.to_string(), content=content,
)
return ret
except Exception as e:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 75e96ae1a2..69be86893b 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret
- def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+ def _remote_reject_invite(
+ self, requester, remote_room_hosts, room_id, target, content
+ ):
"""Implements RoomMemberHandler._remote_reject_invite
"""
return self._remote_reject_client(
@@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
+ content=content,
)
def _user_joined_room(self, target, room_id):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ca8ae9fb5b..856337b7e2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -120,7 +120,7 @@ class TypingHandler(object):
auth_user_id = auth_user.to_string()
if not self.is_mine_id(target_user_id):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ raise SynapseError(400, "User is not hosted on this homeserver")
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
@@ -150,7 +150,7 @@ class TypingHandler(object):
auth_user_id = auth_user.to_string()
if not self.is_mine_id(target_user_id):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ raise SynapseError(400, "User is not hosted on this homeserver")
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
|