diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 594754cfd8..ac09d03ba9 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -93,9 +93,9 @@ class BaseHandler(object):
messages_per_second = self.hs.config.rc_messages_per_second
burst_count = self.hs.config.rc_message_burst_count
- allowed, time_allowed = self.ratelimiter.send_message(
+ allowed, time_allowed = self.ratelimiter.can_do_action(
user_id, time_now,
- msg_rate_hz=messages_per_second,
+ rate_hz=messages_per_second,
burst_count=burst_count,
update=update,
)
@@ -165,6 +165,7 @@ class BaseHandler(object):
member_event.room_id,
"leave",
ratelimit=False,
+ require_consent=False,
)
except Exception as e:
logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2abd9af94f..caad9ae2dd 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.api.ratelimiting import Ratelimiter
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
login_types.append(t)
self._supported_login_types = login_types
+ self._account_ratelimiter = Ratelimiter()
+ self._failed_attempts_ratelimiter = Ratelimiter()
+
+ self._clock = self.hs.get_clock()
+
@defer.inlineCallbacks
def validate_user_via_ui_auth(self, requester, request_body, clientip):
"""
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
Returns:
defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
+
+ Raises:
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
+ self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None:
defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request
LoginError if there was an authentication problem.
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
username, self.hs.hostname
).to_string()
+ self.ratelimit_login_per_account(qualified_user_id)
+
login_type = login_submission.get("type")
known_login_type = False
@@ -715,9 +730,16 @@ class AuthHandler(BaseHandler):
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
- # unknown username or invalid password. We raise a 403 here, but note
- # that if we're doing user-interactive login, it turns all LoginErrors
- # into a 401 anyway.
+ # unknown username or invalid password.
+ self._failed_attempts_ratelimiter.ratelimit(
+ qualified_user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=True,
+ )
+
+ # We raise a 403 here, but note that if we're doing user-interactive
+ # login, it turns all LoginErrors into a 401 anyway.
raise LoginError(
403, "Invalid password",
errcode=Codes.FORBIDDEN
@@ -735,6 +757,10 @@ class AuthHandler(BaseHandler):
password (unicode): the provided password
Returns:
(unicode) the canonical_user_id, or None if unknown user / bad password
+
+ Raises:
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -763,6 +789,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ self.ratelimit_login_per_account(user_id)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@@ -934,6 +961,33 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
+ def ratelimit_login_per_account(self, user_id):
+ """Checks whether the process must be stopped because of ratelimiting.
+
+ Checks against two ratelimiters: the generic one for login attempts per
+ account and the one specific to failed attempts.
+
+ Args:
+ user_id (unicode): complete @user:id
+
+ Raises:
+ LimitExceededError if one of the ratelimiters' login requests count
+ for this user is too high too proceed.
+ """
+ self._failed_attempts_ratelimiter.ratelimit(
+ user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=False,
+ )
+
+ self._account_ratelimiter.ratelimit(
+ user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_account.per_second,
+ burst_count=self.hs.config.rc_login_account.burst_count,
+ update=True,
+ )
+
@attr.s
class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index bd7b1f30e0..5b197b53f2 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -168,6 +168,7 @@ class DeactivateAccountHandler(BaseHandler):
room_id,
"leave",
ratelimit=False,
+ require_consent=False,
)
except Exception:
logger.exception(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b711ce1e86..d69fc8b061 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,13 +37,185 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class DeviceHandler(BaseHandler):
+class DeviceWorkerHandler(BaseHandler):
def __init__(self, hs):
- super(DeviceHandler, self).__init__(hs)
+ super(DeviceWorkerHandler, self).__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
+
+ @defer.inlineCallbacks
+ def get_devices_by_user(self, user_id):
+ """
+ Retrieve the given user's devices
+
+ Args:
+ user_id (str):
+ Returns:
+ defer.Deferred: list[dict[str, X]]: info on each device
+ """
+
+ device_map = yield self.store.get_devices_by_user(user_id)
+
+ ips = yield self.store.get_last_client_ip_by_device(
+ user_id, device_id=None
+ )
+
+ devices = list(device_map.values())
+ for device in devices:
+ _update_device_from_client_ips(device, ips)
+
+ defer.returnValue(devices)
+
+ @defer.inlineCallbacks
+ def get_device(self, user_id, device_id):
+ """ Retrieve the given device
+
+ Args:
+ user_id (str):
+ device_id (str):
+
+ Returns:
+ defer.Deferred: dict[str, X]: info on the device
+ Raises:
+ errors.NotFoundError: if the device was not found
+ """
+ try:
+ device = yield self.store.get_device(user_id, device_id)
+ except errors.StoreError:
+ raise errors.NotFoundError
+ ips = yield self.store.get_last_client_ip_by_device(
+ user_id, device_id,
+ )
+ _update_device_from_client_ips(device, ips)
+ defer.returnValue(device)
+
+ @measure_func("device.get_user_ids_changed")
+ @defer.inlineCallbacks
+ def get_user_ids_changed(self, user_id, from_token):
+ """Get list of users that have had the devices updated, or have newly
+ joined a room, that `user_id` may be interested in.
+
+ Args:
+ user_id (str)
+ from_token (StreamToken)
+ """
+ now_room_key = yield self.store.get_room_events_max_id()
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ # First we check if any devices have changed
+ changed = yield self.store.get_user_whose_devices_changed(
+ from_token.device_list_key
+ )
+
+ # Then work out if any users have since joined
+ rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+
+ member_events = yield self.store.get_membership_changes_for_user(
+ user_id, from_token.room_key, now_room_key,
+ )
+ rooms_changed.update(event.room_id for event in member_events)
+
+ stream_ordering = RoomStreamToken.parse_stream_token(
+ from_token.room_key
+ ).stream
+
+ possibly_changed = set(changed)
+ possibly_left = set()
+ for room_id in rooms_changed:
+ current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+ # The user may have left the room
+ # TODO: Check if they actually did or if we were just invited.
+ if room_id not in room_ids:
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_left.add(state_key)
+ continue
+
+ # Fetch the current state at the time.
+ try:
+ event_ids = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_ordering=stream_ordering
+ )
+ except errors.StoreError:
+ # we have purged the stream_ordering index since the stream
+ # ordering: treat it the same as a new room
+ event_ids = []
+
+ # special-case for an empty prev state: include all members
+ # in the changed list
+ if not event_ids:
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ continue
+
+ current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+ if not current_member_id:
+ continue
+
+ # mapping from event_id -> state_dict
+ prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
+ # Check if we've joined the room? If so we just blindly add all the users to
+ # the "possibly changed" users.
+ for state_dict in itervalues(prev_state_ids):
+ member_event = state_dict.get((EventTypes.Member, user_id), None)
+ if not member_event or member_event != current_member_id:
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ break
+
+ # If there has been any change in membership, include them in the
+ # possibly changed list. We'll check if they are joined below,
+ # and we're not toooo worried about spuriously adding users.
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+
+ # check if this member has changed since any of the extremities
+ # at the stream_ordering, and add them to the list if so.
+ for state_dict in itervalues(prev_state_ids):
+ prev_event_id = state_dict.get(key, None)
+ if not prev_event_id or prev_event_id != event_id:
+ if state_key != user_id:
+ possibly_changed.add(state_key)
+ break
+
+ if possibly_changed or possibly_left:
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+
+ # Take the intersection of the users whose devices may have changed
+ # and those that actually still share a room with the user
+ possibly_joined = possibly_changed & users_who_share_room
+ possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+ else:
+ possibly_joined = []
+ possibly_left = []
+
+ defer.returnValue({
+ "changed": list(possibly_joined),
+ "left": list(possibly_left),
+ })
+
+
+class DeviceHandler(DeviceWorkerHandler):
+ def __init__(self, hs):
+ super(DeviceHandler, self).__init__(hs)
+
self.federation_sender = hs.get_federation_sender()
self._edu_updater = DeviceListEduUpdater(hs, self)
@@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
@defer.inlineCallbacks
- def get_devices_by_user(self, user_id):
- """
- Retrieve the given user's devices
-
- Args:
- user_id (str):
- Returns:
- defer.Deferred: list[dict[str, X]]: info on each device
- """
-
- device_map = yield self.store.get_devices_by_user(user_id)
-
- ips = yield self.store.get_last_client_ip_by_device(
- user_id, device_id=None
- )
-
- devices = list(device_map.values())
- for device in devices:
- _update_device_from_client_ips(device, ips)
-
- defer.returnValue(devices)
-
- @defer.inlineCallbacks
- def get_device(self, user_id, device_id):
- """ Retrieve the given device
-
- Args:
- user_id (str):
- device_id (str):
-
- Returns:
- defer.Deferred: dict[str, X]: info on the device
- Raises:
- errors.NotFoundError: if the device was not found
- """
- try:
- device = yield self.store.get_device(user_id, device_id)
- except errors.StoreError:
- raise errors.NotFoundError
- ips = yield self.store.get_last_client_ip_by_device(
- user_id, device_id,
- )
- _update_device_from_client_ips(device, ips)
- defer.returnValue(device)
-
- @defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -293,126 +419,6 @@ class DeviceHandler(BaseHandler):
for host in hosts:
self.federation_sender.send_device_messages(host)
- @measure_func("device.get_user_ids_changed")
- @defer.inlineCallbacks
- def get_user_ids_changed(self, user_id, from_token):
- """Get list of users that have had the devices updated, or have newly
- joined a room, that `user_id` may be interested in.
-
- Args:
- user_id (str)
- from_token (StreamToken)
- """
- now_token = yield self.hs.get_event_sources().get_current_token()
-
- room_ids = yield self.store.get_rooms_for_user(user_id)
-
- # First we check if any devices have changed
- changed = yield self.store.get_user_whose_devices_changed(
- from_token.device_list_key
- )
-
- # Then work out if any users have since joined
- rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
-
- member_events = yield self.store.get_membership_changes_for_user(
- user_id, from_token.room_key, now_token.room_key
- )
- rooms_changed.update(event.room_id for event in member_events)
-
- stream_ordering = RoomStreamToken.parse_stream_token(
- from_token.room_key
- ).stream
-
- possibly_changed = set(changed)
- possibly_left = set()
- for room_id in rooms_changed:
- current_state_ids = yield self.store.get_current_state_ids(room_id)
-
- # The user may have left the room
- # TODO: Check if they actually did or if we were just invited.
- if room_id not in room_ids:
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- possibly_left.add(state_key)
- continue
-
- # Fetch the current state at the time.
- try:
- event_ids = yield self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering=stream_ordering
- )
- except errors.StoreError:
- # we have purged the stream_ordering index since the stream
- # ordering: treat it the same as a new room
- event_ids = []
-
- # special-case for an empty prev state: include all members
- # in the changed list
- if not event_ids:
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- continue
-
- current_member_id = current_state_ids.get((EventTypes.Member, user_id))
- if not current_member_id:
- continue
-
- # mapping from event_id -> state_dict
- prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
-
- # Check if we've joined the room? If so we just blindly add all the users to
- # the "possibly changed" users.
- for state_dict in itervalues(prev_state_ids):
- member_event = state_dict.get((EventTypes.Member, user_id), None)
- if not member_event or member_event != current_member_id:
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- break
-
- # If there has been any change in membership, include them in the
- # possibly changed list. We'll check if they are joined below,
- # and we're not toooo worried about spuriously adding users.
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
-
- # check if this member has changed since any of the extremities
- # at the stream_ordering, and add them to the list if so.
- for state_dict in itervalues(prev_state_ids):
- prev_event_id = state_dict.get(key, None)
- if not prev_event_id or prev_event_id != event_id:
- if state_key != user_id:
- possibly_changed.add(state_key)
- break
-
- if possibly_changed or possibly_left:
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
-
- # Take the intersection of the users whose devices may have changed
- # and those that actually still share a room with the user
- possibly_joined = possibly_changed & users_who_share_room
- possibly_left = (possibly_changed | possibly_left) - users_who_share_room
- else:
- possibly_joined = []
- possibly_left = []
-
- defer.returnValue({
- "changed": list(possibly_joined),
- "left": list(possibly_left),
- })
-
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
@@ -586,11 +592,21 @@ class DeviceListEduUpdater(object):
)
devices = []
+ for device in devices:
+ logger.debug(
+ "Handling resync update %r/%r, ID: %r",
+ user_id, device["device_id"], stream_id,
+ )
+
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
+
+ # We clobber the seen updates since we've re-synced from a given
+ # point.
+ self._seen_updates[user_id] = set([stream_id])
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
@@ -603,9 +619,9 @@ class DeviceListEduUpdater(object):
user_id, [device_id for device_id, _, _, _ in pending_updates]
)
- self._seen_updates.setdefault(user_id, set()).update(
- stream_id for _, stream_id, _, _ in pending_updates
- )
+ self._seen_updates.setdefault(user_id, set()).update(
+ stream_id for _, stream_id, _, _ in pending_updates
+ )
@defer.inlineCallbacks
def _need_to_do_resync(self, user_id, updates):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8b113307d2..fe128d9c88 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.config = hs.config
+ self.enable_room_list_search = hs.config.enable_room_list_search
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
+ if visibility == "public" and not self.enable_room_list_search:
+ # The room list has been disabled.
+ raise AuthError(
+ 403,
+ "This user is not permitted to publish rooms to the room list"
+ )
+
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a222d67190..1f799a04c1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.crypto.event_signing import compute_event_signature
+from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -858,6 +859,52 @@ class FederationHandler(BaseHandler):
logger.debug("Not backfilling as no extremeties found.")
return
+ # We only want to paginate if we can actually see the events we'll get,
+ # as otherwise we'll just spend a lot of resources to get redacted
+ # events.
+ #
+ # We do this by filtering all the backwards extremities and seeing if
+ # any remain. Given we don't have the extremity events themselves, we
+ # need to actually check the events that reference them.
+ #
+ # *Note*: the spec wants us to keep backfilling until we reach the start
+ # of the room in case we are allowed to see some of the history. However
+ # in practice that causes more issues than its worth, as a) its
+ # relatively rare for there to be any visible history and b) even when
+ # there is its often sufficiently long ago that clients would stop
+ # attempting to paginate before backfill reached the visible history.
+ #
+ # TODO: If we do do a backfill then we should filter the backwards
+ # extremities to only include those that point to visible portions of
+ # history.
+ #
+ # TODO: Correctly handle the case where we are allowed to see the
+ # forward event but not the backward extremity, e.g. in the case of
+ # initial join of the server where we are allowed to see the join
+ # event but not anything before it. This would require looking at the
+ # state *before* the event, ignoring the special casing certain event
+ # types have.
+
+ forward_events = yield self.store.get_successor_events(
+ list(extremities),
+ )
+
+ extremities_events = yield self.store.get_events(
+ forward_events,
+ check_redacted=False,
+ get_prev_content=False,
+ )
+
+ # We set `check_history_visibility_only` as we might otherwise get false
+ # positives from users having been erased.
+ filtered_extremities = yield filter_events_for_server(
+ self.store, self.server_name, list(extremities_events.values()),
+ redact=False, check_history_visibility_only=True,
+ )
+
+ if not filtered_extremities:
+ defer.returnValue(False)
+
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(
extremities.items(),
@@ -1582,6 +1629,7 @@ class FederationHandler(BaseHandler):
origin, event,
state=state,
auth_events=auth_events,
+ backfilled=backfilled,
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
@@ -1626,6 +1674,7 @@ class FederationHandler(BaseHandler):
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
+ backfilled=backfilled,
)
defer.returnValue(res)
@@ -1748,7 +1797,7 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _prep_event(self, origin, event, state=None, auth_events=None):
+ def _prep_event(self, origin, event, state, auth_events, backfilled):
"""
Args:
@@ -1756,6 +1805,7 @@ class FederationHandler(BaseHandler):
event:
state:
auth_events:
+ backfilled (bool)
Returns:
Deferred, which resolves to synapse.events.snapshot.EventContext
@@ -1797,12 +1847,100 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
+ if not context.rejected:
+ yield self._check_for_soft_fail(event, state, backfilled)
+
if event.type == EventTypes.GuestAccess and not context.rejected:
yield self.maybe_kick_guest_users(event)
defer.returnValue(context)
@defer.inlineCallbacks
+ def _check_for_soft_fail(self, event, state, backfilled):
+ """Checks if we should soft fail the event, if so marks the event as
+ such.
+
+ Args:
+ event (FrozenEvent)
+ state (dict|None): The state at the event if we don't have all the
+ event's prev events
+ backfilled (bool): Whether the event is from backfill
+
+ Returns:
+ Deferred
+ """
+ # For new (non-backfilled and non-outlier) events we check if the event
+ # passes auth based on the current state. If it doesn't then we
+ # "soft-fail" the event.
+ do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
+ if do_soft_fail_check:
+ extrem_ids = yield self.store.get_latest_event_ids_in_room(
+ event.room_id,
+ )
+
+ extrem_ids = set(extrem_ids)
+ prev_event_ids = set(event.prev_event_ids())
+
+ if extrem_ids == prev_event_ids:
+ # If they're the same then the current state is the same as the
+ # state at the event, so no point rechecking auth for soft fail.
+ do_soft_fail_check = False
+
+ if do_soft_fail_check:
+ room_version = yield self.store.get_room_version(event.room_id)
+
+ # Calculate the "current state".
+ if state is not None:
+ # If we're explicitly given the state then we won't have all the
+ # prev events, and so we have a gap in the graph. In this case
+ # we want to be a little careful as we might have been down for
+ # a while and have an incorrect view of the current state,
+ # however we still want to do checks as gaps are easy to
+ # maliciously manufacture.
+ #
+ # So we use a "current state" that is actually a state
+ # resolution across the current forward extremities and the
+ # given state at the event. This should correctly handle cases
+ # like bans, especially with state res v2.
+
+ state_sets = yield self.store.get_state_groups(
+ event.room_id, extrem_ids,
+ )
+ state_sets = list(state_sets.values())
+ state_sets.append(state)
+ current_state_ids = yield self.state_handler.resolve_events(
+ room_version, state_sets, event,
+ )
+ current_state_ids = {
+ k: e.event_id for k, e in iteritems(current_state_ids)
+ }
+ else:
+ current_state_ids = yield self.state_handler.get_current_state_ids(
+ event.room_id, latest_event_ids=extrem_ids,
+ )
+
+ # Now check if event pass auth against said current state
+ auth_types = auth_types_for_event(event)
+ current_state_ids = [
+ e for k, e in iteritems(current_state_ids)
+ if k in auth_types
+ ]
+
+ current_auth_events = yield self.store.get_events(current_state_ids)
+ current_auth_events = {
+ (e.type, e.state_key): e for e in current_auth_events.values()
+ }
+
+ try:
+ self.auth.check(room_version, event, auth_events=current_auth_events)
+ except AuthError as e:
+ logger.warn(
+ "Failed current state auth resolution for %r because %s",
+ event, e,
+ )
+ event.internal_metadata.soft_failed = True
+
+ @defer.inlineCallbacks
def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
missing):
in_room = yield self.auth.check_host_in_room(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..9b41c7b205 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,12 +243,19 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
- if self.config.block_events_without_consent_error is not None:
+ self._block_events_without_consent_error = (
+ self.config.block_events_without_consent_error
+ )
+
+ # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+ # config options, but *only* if we have a configuration for which we are
+ # going to need it.
+ if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
- prev_events_and_hashes=None):
+ prev_events_and_hashes=None, require_consent=True):
"""
Given a dict from a client, create a new event.
@@ -269,6 +276,9 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
+
+ require_consent (bool): Whether to check if the requester has
+ consented to privacy policy.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
@@ -310,7 +320,7 @@ class EventCreationHandler(object):
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
- if not is_exempt:
+ if require_consent and not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
if token_id is not None:
@@ -378,7 +388,7 @@ class EventCreationHandler(object):
Raises:
ConsentNotGivenError: if the user has not given consent yet
"""
- if self.config.block_events_without_consent_error is None:
+ if self._block_events_without_consent_error is None:
return
# exempt AS users from needing consent
@@ -405,7 +415,7 @@ class EventCreationHandler(object):
consent_uri = self._consent_uri_builder.build_user_consent_uri(
requester.user.localpart,
)
- msg = self.config.block_events_without_consent_error % {
+ msg = self._block_events_without_consent_error % {
'consent_uri': consent_uri,
}
raise ConsentNotGivenError(
@@ -436,10 +446,11 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
- logger.info(
- "Not bothering to persist duplicate state event %s", event.event_id,
- )
if prev_state is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id, prev_state.event_id,
+ )
defer.returnValue(prev_state)
yield self.handle_new_client_event(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index ba3856674d..37e87fc054 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -816,7 +816,7 @@ class PresenceHandler(object):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
- yield self.federation.send_edu(
+ yield self.federation.build_and_send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
content={
@@ -836,7 +836,7 @@ class PresenceHandler(object):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
- self.federation.send_edu(
+ self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",
content={
@@ -848,7 +848,7 @@ class PresenceHandler(object):
state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
- self.federation.send_edu(
+ self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence",
content={
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 696469732c..274d2946ad 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,10 +16,8 @@ import logging
from twisted.internet import defer
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
@@ -39,42 +37,17 @@ class ReceiptsHandler(BaseHandler):
self.state = hs.get_state_handler()
@defer.inlineCallbacks
- def received_client_receipt(self, room_id, receipt_type, user_id,
- event_id):
- """Called when a client tells us a local user has read up to the given
- event_id in the room.
- """
- receipt = {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": [event_id],
- "data": {
- "ts": int(self.clock.time_msec()),
- }
- }
-
- is_new = yield self._handle_new_receipts([receipt])
-
- if is_new:
- # fire off a process in the background to send the receipt to
- # remote servers
- run_as_background_process(
- 'push_receipts_to_remotes', self._push_remotes, receipt
- )
-
- @defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = [
- {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": user_values["event_ids"],
- "data": user_values.get("data", {}),
- }
+ ReadReceipt(
+ room_id=room_id,
+ receipt_type=receipt_type,
+ user_id=user_id,
+ event_ids=user_values["event_ids"],
+ data=user_values.get("data", {}),
+ )
for room_id, room_values in content.items()
for receipt_type, users in room_values.items()
for user_id, user_values in users.items()
@@ -90,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
max_batch_id = None
for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
res = yield self.store.insert_receipt(
- room_id, receipt_type, user_id, event_ids, data
+ receipt.room_id,
+ receipt.receipt_type,
+ receipt.user_id,
+ receipt.event_ids,
+ receipt.data,
)
if not res:
@@ -115,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
- affected_room_ids = list(set([r["room_id"] for r in receipts]))
+ affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
@@ -128,43 +99,26 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
@defer.inlineCallbacks
- def _push_remotes(self, receipt):
- """Given a receipt, works out which remote servers should be
- poked and pokes them.
+ def received_client_receipt(self, room_id, receipt_type, user_id,
+ event_id):
+ """Called when a client tells us a local user has read up to the given
+ event_id in the room.
"""
- try:
- # TODO: optimise this to move some of the work to the workers.
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
- }
- }
- },
- },
- key=(room_id, receipt_type, user_id),
- )
- except Exception:
- logger.exception("Error pushing receipts to remote servers")
+ receipt = ReadReceipt(
+ room_id=room_id,
+ receipt_type=receipt_type,
+ user_id=user_id,
+ event_ids=[event_id],
+ data={
+ "ts": int(self.clock.time_msec()),
+ },
+ )
+
+ is_new = yield self._handle_new_receipts([receipt])
+ if not is_new:
+ return
+
+ yield self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7223af9014..b0468cd3d5 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,9 @@ from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
Codes,
+ ConsentNotGivenError,
InvalidCaptchaError,
+ LimitExceededError,
RegistrationError,
SynapseError,
)
@@ -61,6 +63,7 @@ class RegistrationHandler(BaseHandler):
self.captcha_client = CaptchaServerHttpClient(hs)
self.http_client = hs.get_simple_http_client()
self.identity_handler = self.hs.get_handlers().identity_handler
+ self.ratelimiter = hs.get_registration_ratelimiter()
self._next_generated_user_id = None
@@ -150,6 +153,7 @@ class RegistrationHandler(BaseHandler):
threepid=None,
user_type=None,
default_display_name=None,
+ address=None,
):
"""Registers a new client on the server.
@@ -168,6 +172,7 @@ class RegistrationHandler(BaseHandler):
api.constants.UserTypes, or None for a normal user.
default_display_name (unicode|None): if set, the new user's displayname
will be set to this. Defaults to 'localpart'.
+ address (str|None): the IP address used to perform the regitration.
Returns:
A tuple of (user_id, access_token).
Raises:
@@ -207,7 +212,7 @@ class RegistrationHandler(BaseHandler):
token = None
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
@@ -216,6 +221,7 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=default_display_name,
admin=admin,
user_type=user_type,
+ address=address,
)
if default_display_name:
@@ -244,12 +250,13 @@ class RegistrationHandler(BaseHandler):
if default_display_name is None:
default_display_name = localpart
try:
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
make_guest=make_guest,
create_profile_with_displayname=default_display_name,
+ address=address,
)
yield self.profile_handler.set_displayname(
@@ -316,6 +323,10 @@ class RegistrationHandler(BaseHandler):
)
else:
yield self._join_user_to_room(fake_requester, r)
+ except ConsentNotGivenError as e:
+ # Technically not necessary to pull out this error though
+ # moving away from bare excepts is a good thing to do.
+ logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
@@ -356,7 +367,7 @@ class RegistrationHandler(BaseHandler):
display_name = display_name or user.localpart
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
password_hash=password_hash,
appservice_id=service_id,
@@ -415,7 +426,7 @@ class RegistrationHandler(BaseHandler):
yield self.check_user_id_not_appservice_exclusive(user_id)
token = self.macaroon_gen.generate_access_token(user_id)
try:
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=None,
@@ -611,7 +622,7 @@ class RegistrationHandler(BaseHandler):
token = self.macaroon_gen.generate_access_token(user_id)
if need_register:
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
@@ -686,10 +697,10 @@ class RegistrationHandler(BaseHandler):
ratelimit=False,
)
- def _register_with_store(self, user_id, token=None, password_hash=None,
- was_guest=False, make_guest=False, appservice_id=None,
- create_profile_with_displayname=None, admin=False,
- user_type=None):
+ def register_with_store(self, user_id, token=None, password_hash=None,
+ was_guest=False, make_guest=False, appservice_id=None,
+ create_profile_with_displayname=None, admin=False,
+ user_type=None, address=None):
"""Register user in the datastore.
Args:
@@ -708,10 +719,26 @@ class RegistrationHandler(BaseHandler):
admin (boolean): is an admin user?
user_type (str|None): type of user. One of the values from
api.constants.UserTypes, or None for a normal user.
+ address (str|None): the IP address used to perform the regitration.
Returns:
Deferred
"""
+ # Don't rate limit for app services
+ if appservice_id is None and address is not None:
+ time_now = self.clock.time()
+
+ allowed, time_allowed = self.ratelimiter.can_do_action(
+ address, time_now_s=time_now,
+ rate_hz=self.hs.config.rc_registration.per_second,
+ burst_count=self.hs.config.rc_registration.burst_count,
+ )
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now)),
+ )
+
if self.hs.config.worker_app:
return self._register_client(
user_id=user_id,
@@ -723,6 +750,7 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=create_profile_with_displayname,
admin=admin,
user_type=user_type,
+ address=address,
)
else:
return self.store.register(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index afa508d729..d6c9d56007 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
+ self.enable_room_list_search = hs.config.enable_room_list_search
self.response_cache = ResponseCache(hs, "room_list")
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
@@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
+ if not self.enable_room_list_search:
+ return defer.succeed({
+ "chunk": [],
+ "total_room_count_estimate": 0,
+ })
+
logger.info(
"Getting public room list: limit=%r, since=%r, search=%r, network=%r",
limit, since_token, bool(search_filter), network_tuple,
)
+
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler):
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
+ if not self.enable_room_list_search:
+ defer.returnValue({
+ "chunk": [],
+ "total_room_count_estimate": 0,
+ })
+
if search_filter:
# We currently don't support searching across federation, so we have
# to do it manually without pagination
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index dcf30327cd..645f615d74 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -160,6 +160,7 @@ class RoomMemberHandler(object):
txn_id=None,
ratelimit=True,
content=None,
+ require_consent=True,
):
user_id = target.to_string()
@@ -185,6 +186,7 @@ class RoomMemberHandler(object):
token_id=requester.access_token_id,
txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes,
+ require_consent=require_consent,
)
# Check if this event matches the previous membership event for the user.
@@ -232,6 +234,10 @@ class RoomMemberHandler(object):
self.copy_room_tags_and_direct_to_room(
predecessor["room_id"], room_id, user_id,
)
+ # Move over old push rules
+ self.store.move_push_rules_from_room_to_room_for_user(
+ predecessor["room_id"], room_id, user_id,
+ )
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -302,6 +308,7 @@ class RoomMemberHandler(object):
ratelimit=True,
content=None,
new_room=False,
+ require_consent=True,
):
"""Update a users membership in a room
@@ -339,6 +346,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
content=content,
new_room=new_room,
+ require_consent=require_consent,
)
defer.returnValue(result)
@@ -356,6 +364,7 @@ class RoomMemberHandler(object):
ratelimit=True,
content=None,
new_room=False,
+ require_consent=True,
):
content_specified = bool(content)
if content is None:
@@ -559,6 +568,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
+ require_consent=require_consent,
)
defer.returnValue(res)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 26e3f11ba1..36880c560e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Debug logger for https://github.com/matrix-org/synapse/issues/4422
+issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -962,6 +965,15 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_groups(sync_result_builder)
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ for joined_room in sync_result_builder.joined:
+ room_id = joined_room.room_id
+ if room_id in newly_joined_rooms:
+ issue4422_logger.debug(
+ "Sync result for newly joined room %s: %r",
+ room_id, joined_room,
+ )
+
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@@ -1431,6 +1443,17 @@ class SyncHandler(object):
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
+
+ # debug for #4422
+ if has_join:
+ prev_membership = None
+ if old_mem_ev:
+ prev_membership = old_mem_ev.membership
+ issue4422_logger.debug(
+ "Previous membership for room %s with join: %s (event %s)",
+ room_id, prev_membership, old_mem_ev_id,
+ )
+
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
@@ -1525,30 +1548,39 @@ class SyncHandler(object):
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
+ newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
- room_entries.append(RoomSyncResultBuilder(
+ entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
- newly_joined=room_id in newly_joined_rooms,
+ newly_joined=newly_joined,
full_state=False,
- since_token=None if room_id in newly_joined_rooms else since_token,
+ since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
- ))
+ )
else:
- room_entries.append(RoomSyncResultBuilder(
+ entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
- newly_joined=room_id in newly_joined_rooms,
+ newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
- ))
+ )
+
+ if newly_joined:
+ # debugging for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "RoomSyncResultBuilder events for newly joined room %s: %r",
+ room_id, entry.events,
+ )
+ room_entries.append(entry)
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
@@ -1669,6 +1701,13 @@ class SyncHandler(object):
newly_joined_room=newly_joined,
)
+ if newly_joined:
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "Timeline events after filtering in newly-joined room %s: %r",
+ room_id, batch,
+ )
+
# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
@@ -1900,15 +1939,34 @@ def _calculate_state(
class SyncResultBuilder(object):
- "Used to help build up a new SyncResult for a user"
+ """Used to help build up a new SyncResult for a user
+
+ Attributes:
+ sync_config (SyncConfig)
+ full_state (bool)
+ since_token (StreamToken)
+ now_token (StreamToken)
+ joined_room_ids (list[str])
+
+ # The following mirror the fields in a sync response
+ presence (list)
+ account_data (list)
+ joined (list[JoinedSyncResult])
+ invited (list[InvitedSyncResult])
+ archived (list[ArchivedSyncResult])
+ device (list)
+ groups (GroupsSyncResult|None)
+ to_device (list)
+ """
def __init__(self, sync_config, full_state, since_token, now_token,
joined_room_ids):
"""
Args:
- sync_config(SyncConfig)
- full_state(bool): The full_state flag as specified by user
- since_token(StreamToken): The token supplied by user, or None.
- now_token(StreamToken): The token to sync up to.
+ sync_config (SyncConfig)
+ full_state (bool): The full_state flag as specified by user
+ since_token (StreamToken): The token supplied by user, or None.
+ now_token (StreamToken): The token to sync up to.
+ joined_room_ids (list[str]): List of rooms the user is joined to
"""
self.sync_config = sync_config
self.full_state = full_state
@@ -1936,8 +1994,8 @@ class RoomSyncResultBuilder(object):
Args:
room_id(str)
rtype(str): One of `"joined"` or `"archived"`
- events(list): List of events to include in the room, (more events
- may be added when generating result).
+ events(list[FrozenEvent]): List of events to include in the room
+ (more events may be added when generating result).
newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a61bbf9392..39df960c31 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -231,7 +231,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
- self.federation.send_edu(
+ self.federation.build_and_send_edu(
destination=domain,
edu_type="m.typing",
content={
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,7 +15,7 @@
import logging
-from six import iteritems
+from six import iteritems, iterkeys
from twisted.internet import defer
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
-
- For each user in the directory we also store a room_id which is public and that the
- user is joined to. This allows us to ignore history_visibility and join_rules changes
- for that user in all other public rooms, as we know they'll still be in at least
- one public room.
"""
- INITIAL_ROOM_SLEEP_MS = 50
- INITIAL_ROOM_SLEEP_COUNT = 100
- INITIAL_ROOM_BATCH_SIZE = 100
- INITIAL_USER_SLEEP_MS = 10
-
def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@@ -59,15 +49,6 @@ class UserDirectoryHandler(object):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
-
- # When start up for the first time we need to populate the user_directory.
- # This is a set of user_id's we've inserted already
- self.initially_handled_users = set()
- self.initially_handled_users_in_public = set()
-
- self.initially_handled_users_share = set()
- self.initially_handled_users_share_private_room = set()
-
# The current position in the current_state_delta stream
self.pos = None
@@ -130,7 +111,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
- user_id, profile.display_name, profile.avatar_url, None
+ user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
@@ -140,7 +121,6 @@ class UserDirectoryHandler(object):
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
- yield self.store.remove_from_user_in_public_room(user_id)
@defer.inlineCallbacks
def _unsafe_process(self):
@@ -148,10 +128,9 @@ class UserDirectoryHandler(object):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
- # If still None then we need to do the initial fill of directory
+ # If still None then the initial background update hasn't happened yet
if self.pos is None:
- yield self._do_initial_spam()
- self.pos = yield self.store.get_user_directory_stream_pos()
+ defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
@@ -173,149 +152,6 @@ class UserDirectoryHandler(object):
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
- def _do_initial_spam(self):
- """Populates the user_directory from the current state of the DB, used
- when synapse first starts with user_directory support
- """
- new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
- # Delete any existing entries just in case there are any
- yield self.store.delete_all_from_user_dir()
-
- # We process by going through each existing room at a time.
- room_ids = yield self.store.get_all_rooms()
-
- logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
- num_processed_rooms = 0
-
- for room_id in room_ids:
- logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
- yield self._handle_initial_room(room_id)
- num_processed_rooms += 1
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- logger.info("Processed all rooms.")
-
- if self.search_all_users:
- num_processed_users = 0
- user_ids = yield self.store.get_all_local_users()
- logger.info(
- "Doing initial update of user directory. %d users", len(user_ids)
- )
- for user_id in user_ids:
- # We add profiles for all users even if they don't match the
- # include pattern, just in case we want to change it in future
- logger.info(
- "Handling user %d/%d", num_processed_users + 1, len(user_ids)
- )
- yield self._handle_local_user(user_id)
- num_processed_users += 1
- yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
- logger.info("Processed all users")
-
- self.initially_handled_users = None
- self.initially_handled_users_in_public = None
- self.initially_handled_users_share = None
- self.initially_handled_users_share_private_room = None
-
- yield self.store.update_user_directory_stream_pos(new_pos)
-
- @defer.inlineCallbacks
- def _handle_initial_room(self, room_id):
- """Called when we initially fill out user_directory one room at a time
- """
- is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
- if not is_in_room:
- return
-
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- room_id
- )
-
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- user_ids = set(users_with_profile)
- unhandled_users = user_ids - self.initially_handled_users
-
- yield self.store.add_profiles_to_user_dir(
- room_id,
- {user_id: users_with_profile[user_id] for user_id in unhandled_users},
- )
-
- self.initially_handled_users |= unhandled_users
-
- if is_public:
- yield self.store.add_users_to_public_room(
- room_id, user_ids=user_ids - self.initially_handled_users_in_public
- )
- self.initially_handled_users_in_public |= user_ids
-
- # We now go and figure out the new users who share rooms with user entries
- # We sleep aggressively here as otherwise it can starve resources.
- # We also batch up inserts/updates, but try to avoid too many at once.
- to_insert = set()
- to_update = set()
- count = 0
- for user_id in user_ids:
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- if not self.is_mine_id(user_id):
- count += 1
- continue
-
- if self.store.get_if_app_services_interested_in_user(user_id):
- count += 1
- continue
-
- for other_user_id in user_ids:
- if user_id == other_user_id:
- continue
-
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
- count += 1
-
- user_set = (user_id, other_user_id)
-
- if user_set in self.initially_handled_users_share_private_room:
- continue
-
- if user_set in self.initially_handled_users_share:
- if is_public:
- continue
- to_update.add(user_set)
- else:
- to_insert.add(user_set)
-
- if is_public:
- self.initially_handled_users_share.add(user_set)
- else:
- self.initially_handled_users_share_private_room.add(user_set)
-
- if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert
- )
- to_insert.clear()
-
- if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
- to_update.clear()
-
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
- to_insert.clear()
-
- if to_update:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
- to_update.clear()
-
- @defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
@@ -356,6 +192,7 @@ class UserDirectoryHandler(object):
user_ids = yield self.store.get_users_in_dir_due_to_room(
room_id
)
+
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
return
@@ -436,14 +273,20 @@ class UserDirectoryHandler(object):
# ignore the change
return
- if change:
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- for user_id, profile in iteritems(users_with_profile):
- yield self._handle_new_user(room_id, user_id, profile)
- else:
- users = yield self.store.get_users_in_public_due_to_room(room_id)
- for user_id in users:
- yield self._handle_remove_user(room_id, user_id)
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+ # Remove every user from the sharing tables for that room.
+ for user_id in iterkeys(users_with_profile):
+ yield self.store.remove_user_who_share_room(user_id, room_id)
+
+ # Then, re-add them to the tables.
+ # NOTE: this is not the most efficient method, as handle_new_user sets
+ # up local_user -> other_user and other_user_whos_local -> local_user,
+ # which when ran over an entire room, will result in the same values
+ # being added multiple times. The batching upserts shouldn't make this
+ # too bad, though.
+ for user_id, profile in iteritems(users_with_profile):
+ yield self._handle_new_user(room_id, user_id, profile)
@defer.inlineCallbacks
def _handle_local_user(self, user_id):
@@ -457,7 +300,9 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
- yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@@ -469,90 +314,49 @@ class UserDirectoryHandler(object):
"""
logger.debug("Adding new user to dir, %r", user_id)
- row = yield self.store.get_user_in_directory(user_id)
- if not row:
- yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
+ # Now we update users who share rooms with users.
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
if is_public:
- row = yield self.store.get_user_in_public_room(user_id)
- if not row:
- yield self.store.add_users_to_public_room(room_id, [user_id])
+ yield self.store.add_users_in_public_rooms(room_id, (user_id,))
else:
- logger.debug("Not adding new user to public dir, %r", user_id)
-
- # Now we update users who share rooms with users. We do this by getting
- # all the current users in the room and seeing which aren't already
- # marked in the database as sharing with `user_id`
+ to_insert = set()
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
+ # First, if they're our user then we need to update for every user
+ if self.is_mine_id(user_id):
- to_insert = set()
- to_update = set()
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ user_id
+ )
- is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+ # We don't care about appservice users.
+ if not is_appservice:
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- # First, if they're our user then we need to update for every user
- if self.is_mine_id(user_id) and not is_appservice:
- # Returns a map of other_user_id -> shared_private. We only need
- # to update mappings if for users that either don't share a room
- # already (aren't in the map) or, if the room is private, those that
- # only share a public room.
- user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
- user_id
- )
+ to_insert.add((user_id, other_user_id))
+ # Next we need to update for every local user in the room
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue
- shared_is_private = user_ids_shared.get(other_user_id)
- if shared_is_private is True:
- # We've already marked in the database they share a private room
- continue
- elif shared_is_private is False:
- # They already share a public room, so only update if this is
- # a private room
- if not is_public:
- to_update.add((user_id, other_user_id))
- elif shared_is_private is None:
- # This is the first time they both share a room
- to_insert.add((user_id, other_user_id))
-
- # Next we need to update for every local user in the room
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
-
- is_appservice = self.store.get_if_app_services_interested_in_user(
- other_user_id
- )
- if self.is_mine_id(other_user_id) and not is_appservice:
- shared_is_private = yield self.store.get_if_users_share_a_room(
- other_user_id, user_id
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ other_user_id
)
- if shared_is_private is True:
- # We've already marked in the database they share a private room
- continue
- elif shared_is_private is False:
- # They already share a public room, so only update if this is
- # a private room
- if not is_public:
- to_update.add((other_user_id, user_id))
- elif shared_is_private is None:
- # This is the first time they both share a room
+ if self.is_mine_id(other_user_id) and not is_appservice:
to_insert.add((other_user_id, user_id))
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
-
- if to_update:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
+ if to_insert:
+ yield self.store.add_users_who_share_private_room(room_id, to_insert)
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
@@ -562,84 +366,16 @@ class UserDirectoryHandler(object):
room_id (str): room_id that user left or stopped being public that
user_id (str)
"""
- logger.debug("Maybe removing user %r", user_id)
+ logger.debug("Removing user %r", user_id)
- row = yield self.store.get_user_in_directory(user_id)
- update_user_dir = row and row["room_id"] == room_id
+ # Remove user from sharing tables
+ yield self.store.remove_user_who_share_room(user_id, room_id)
- row = yield self.store.get_user_in_public_room(user_id)
- update_user_in_public = row and row["room_id"] == room_id
+ # Are they still in any rooms? If not, remove them entirely.
+ rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
- if update_user_in_public or update_user_dir:
- # XXX: Make this faster?
- rooms = yield self.store.get_rooms_for_user(user_id)
- for j_room_id in rooms:
- if not update_user_in_public and not update_user_dir:
- break
-
- is_in_room = yield self.store.is_host_joined(
- j_room_id, self.server_name
- )
-
- if not is_in_room:
- continue
-
- if update_user_dir:
- update_user_dir = False
- yield self.store.update_user_in_user_dir(user_id, j_room_id)
-
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- j_room_id
- )
-
- if update_user_in_public and is_public:
- yield self.store.update_user_in_public_user_list(user_id, j_room_id)
- update_user_in_public = False
-
- if update_user_dir:
+ if len(rooms_user_is_in) == 0:
yield self.store.remove_from_user_dir(user_id)
- elif update_user_in_public:
- yield self.store.remove_from_user_in_public_room(user_id)
-
- # Now handle users_who_share_rooms.
-
- # Get a list of user tuples that were in the DB due to this room and
- # users (this includes tuples where the other user matches `user_id`)
- user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
- user_id, room_id
- )
-
- for user_id, other_user_id in user_tuples:
- # For each user tuple get a list of rooms that they still share,
- # trying to find a private room, and update the entry in the DB
- rooms = yield self.store.get_rooms_in_common_for_users(
- user_id, other_user_id
- )
-
- # If they dont share a room anymore, remove the mapping
- if not rooms:
- yield self.store.remove_user_who_share_room(user_id, other_user_id)
- continue
-
- found_public_share = None
- for j_room_id in rooms:
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- j_room_id
- )
-
- if is_public:
- found_public_share = j_room_id
- else:
- found_public_share = None
- yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
- )
- break
-
- if found_public_share:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
- )
@defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
@@ -665,9 +401,7 @@ class UserDirectoryHandler(object):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
- yield self.store.update_profile_in_user_dir(
- user_id, new_name, new_avatar, room_id
- )
+ yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
|