diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 594754cfd8..ac09d03ba9 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -93,9 +93,9 @@ class BaseHandler(object):
messages_per_second = self.hs.config.rc_messages_per_second
burst_count = self.hs.config.rc_message_burst_count
- allowed, time_allowed = self.ratelimiter.send_message(
+ allowed, time_allowed = self.ratelimiter.can_do_action(
user_id, time_now,
- msg_rate_hz=messages_per_second,
+ rate_hz=messages_per_second,
burst_count=burst_count,
update=update,
)
@@ -165,6 +165,7 @@ class BaseHandler(object):
member_event.room_id,
"leave",
ratelimit=False,
+ require_consent=False,
)
except Exception as e:
logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2abd9af94f..4544de821d 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.api.ratelimiting import Ratelimiter
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
login_types.append(t)
self._supported_login_types = login_types
+ self._account_ratelimiter = Ratelimiter()
+ self._failed_attempts_ratelimiter = Ratelimiter()
+
+ self._clock = self.hs.get_clock()
+
@defer.inlineCallbacks
def validate_user_via_ui_auth(self, requester, request_body, clientip):
"""
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
Returns:
defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
+
+ Raises:
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
+ self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None:
defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request
LoginError if there was an authentication problem.
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
username, self.hs.hostname
).to_string()
+ self.ratelimit_login_per_account(qualified_user_id)
+
login_type = login_submission.get("type")
known_login_type = False
@@ -715,15 +730,58 @@ class AuthHandler(BaseHandler):
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
- # unknown username or invalid password. We raise a 403 here, but note
- # that if we're doing user-interactive login, it turns all LoginErrors
- # into a 401 anyway.
+ # unknown username or invalid password.
+ self._failed_attempts_ratelimiter.ratelimit(
+ qualified_user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=True,
+ )
+
+ # We raise a 403 here, but note that if we're doing user-interactive
+ # login, it turns all LoginErrors into a 401 anyway.
raise LoginError(
403, "Invalid password",
errcode=Codes.FORBIDDEN
)
@defer.inlineCallbacks
+ def check_password_provider_3pid(self, medium, address, password):
+ """Check if a password provider is able to validate a thirdparty login
+
+ Args:
+ medium (str): The medium of the 3pid (ex. email).
+ address (str): The address of the 3pid (ex. jdoe@example.com).
+ password (str): The password of the user.
+
+ Returns:
+ Deferred[(str|None, func|None)]: A tuple of `(user_id,
+ callback)`. If authentication is successful, `user_id` is a `str`
+ containing the authenticated, canonical user ID. `callback` is
+ then either a function to be later run after the server has
+ completed login/registration, or `None`. If authentication was
+ unsuccessful, `user_id` and `callback` are both `None`.
+ """
+ for provider in self.password_providers:
+ if hasattr(provider, "check_3pid_auth"):
+ # This function is able to return a deferred that either
+ # resolves None, meaning authentication failure, or upon
+ # success, to a str (which is the user_id) or a tuple of
+ # (user_id, callback_func), where callback_func should be run
+ # after we've finished everything else
+ result = yield provider.check_3pid_auth(
+ medium, address, password,
+ )
+ if result:
+ # Check if the return value is a str or a tuple
+ if isinstance(result, str):
+ # If it's a str, set callback function to None
+ result = (result, None)
+ defer.returnValue(result)
+
+ defer.returnValue((None, None))
+
+ @defer.inlineCallbacks
def _check_local_password(self, user_id, password):
"""Authenticate a user against the local password database.
@@ -734,7 +792,12 @@ class AuthHandler(BaseHandler):
user_id (unicode): complete @user:id
password (unicode): the provided password
Returns:
- (unicode) the canonical_user_id, or None if unknown user / bad password
+ Deferred[unicode] the canonical_user_id, or Deferred[None] if
+ unknown user/bad password
+
+ Raises:
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -763,6 +826,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ self.ratelimit_login_per_account(user_id)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@@ -934,6 +998,33 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
+ def ratelimit_login_per_account(self, user_id):
+ """Checks whether the process must be stopped because of ratelimiting.
+
+ Checks against two ratelimiters: the generic one for login attempts per
+ account and the one specific to failed attempts.
+
+ Args:
+ user_id (unicode): complete @user:id
+
+ Raises:
+ LimitExceededError if one of the ratelimiters' login requests count
+ for this user is too high too proceed.
+ """
+ self._failed_attempts_ratelimiter.ratelimit(
+ user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=False,
+ )
+
+ self._account_ratelimiter.ratelimit(
+ user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_account.per_second,
+ burst_count=self.hs.config.rc_login_account.burst_count,
+ update=True,
+ )
+
@attr.s
class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 75fe50c42c..97d3f31d98 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler):
room_id,
"leave",
ratelimit=False,
+ require_consent=False,
)
except Exception:
logger.exception(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c708c35d4d..b398848079 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,13 +37,185 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class DeviceHandler(BaseHandler):
+class DeviceWorkerHandler(BaseHandler):
def __init__(self, hs):
- super(DeviceHandler, self).__init__(hs)
+ super(DeviceWorkerHandler, self).__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
+
+ @defer.inlineCallbacks
+ def get_devices_by_user(self, user_id):
+ """
+ Retrieve the given user's devices
+
+ Args:
+ user_id (str):
+ Returns:
+ defer.Deferred: list[dict[str, X]]: info on each device
+ """
+
+ device_map = yield self.store.get_devices_by_user(user_id)
+
+ ips = yield self.store.get_last_client_ip_by_device(
+ user_id, device_id=None
+ )
+
+ devices = list(device_map.values())
+ for device in devices:
+ _update_device_from_client_ips(device, ips)
+
+ defer.returnValue(devices)
+
+ @defer.inlineCallbacks
+ def get_device(self, user_id, device_id):
+ """ Retrieve the given device
+
+ Args:
+ user_id (str):
+ device_id (str):
+
+ Returns:
+ defer.Deferred: dict[str, X]: info on the device
+ Raises:
+ errors.NotFoundError: if the device was not found
+ """
+ try:
+ device = yield self.store.get_device(user_id, device_id)
+ except errors.StoreError:
+ raise errors.NotFoundError
+ ips = yield self.store.get_last_client_ip_by_device(
+ user_id, device_id,
+ )
+ _update_device_from_client_ips(device, ips)
+ defer.returnValue(device)
+
+ @measure_func("device.get_user_ids_changed")
+ @defer.inlineCallbacks
+ def get_user_ids_changed(self, user_id, from_token):
+ """Get list of users that have had the devices updated, or have newly
+ joined a room, that `user_id` may be interested in.
+
+ Args:
+ user_id (str)
+ from_token (StreamToken)
+ """
+ now_room_key = yield self.store.get_room_events_max_id()
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+
+ # First we check if any devices have changed
+ changed = yield self.store.get_user_whose_devices_changed(
+ from_token.device_list_key
+ )
+
+ # Then work out if any users have since joined
+ rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+
+ member_events = yield self.store.get_membership_changes_for_user(
+ user_id, from_token.room_key, now_room_key,
+ )
+ rooms_changed.update(event.room_id for event in member_events)
+
+ stream_ordering = RoomStreamToken.parse_stream_token(
+ from_token.room_key
+ ).stream
+
+ possibly_changed = set(changed)
+ possibly_left = set()
+ for room_id in rooms_changed:
+ current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+ # The user may have left the room
+ # TODO: Check if they actually did or if we were just invited.
+ if room_id not in room_ids:
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_left.add(state_key)
+ continue
+
+ # Fetch the current state at the time.
+ try:
+ event_ids = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_ordering=stream_ordering
+ )
+ except errors.StoreError:
+ # we have purged the stream_ordering index since the stream
+ # ordering: treat it the same as a new room
+ event_ids = []
+
+ # special-case for an empty prev state: include all members
+ # in the changed list
+ if not event_ids:
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ continue
+
+ current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+ if not current_member_id:
+ continue
+
+ # mapping from event_id -> state_dict
+ prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
+ # Check if we've joined the room? If so we just blindly add all the users to
+ # the "possibly changed" users.
+ for state_dict in itervalues(prev_state_ids):
+ member_event = state_dict.get((EventTypes.Member, user_id), None)
+ if not member_event or member_event != current_member_id:
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ break
+
+ # If there has been any change in membership, include them in the
+ # possibly changed list. We'll check if they are joined below,
+ # and we're not toooo worried about spuriously adding users.
+ for key, event_id in iteritems(current_state_ids):
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+
+ # check if this member has changed since any of the extremities
+ # at the stream_ordering, and add them to the list if so.
+ for state_dict in itervalues(prev_state_ids):
+ prev_event_id = state_dict.get(key, None)
+ if not prev_event_id or prev_event_id != event_id:
+ if state_key != user_id:
+ possibly_changed.add(state_key)
+ break
+
+ if possibly_changed or possibly_left:
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+
+ # Take the intersection of the users whose devices may have changed
+ # and those that actually still share a room with the user
+ possibly_joined = possibly_changed & users_who_share_room
+ possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+ else:
+ possibly_joined = []
+ possibly_left = []
+
+ defer.returnValue({
+ "changed": list(possibly_joined),
+ "left": list(possibly_left),
+ })
+
+
+class DeviceHandler(DeviceWorkerHandler):
+ def __init__(self, hs):
+ super(DeviceHandler, self).__init__(hs)
+
self.federation_sender = hs.get_federation_sender()
self._edu_updater = DeviceListEduUpdater(hs, self)
@@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
@defer.inlineCallbacks
- def get_devices_by_user(self, user_id):
- """
- Retrieve the given user's devices
-
- Args:
- user_id (str):
- Returns:
- defer.Deferred: list[dict[str, X]]: info on each device
- """
-
- device_map = yield self.store.get_devices_by_user(user_id)
-
- ips = yield self.store.get_last_client_ip_by_device(
- user_id, device_id=None
- )
-
- devices = list(device_map.values())
- for device in devices:
- _update_device_from_client_ips(device, ips)
-
- defer.returnValue(devices)
-
- @defer.inlineCallbacks
- def get_device(self, user_id, device_id):
- """ Retrieve the given device
-
- Args:
- user_id (str):
- device_id (str):
-
- Returns:
- defer.Deferred: dict[str, X]: info on the device
- Raises:
- errors.NotFoundError: if the device was not found
- """
- try:
- device = yield self.store.get_device(user_id, device_id)
- except errors.StoreError:
- raise errors.NotFoundError
- ips = yield self.store.get_last_client_ip_by_device(
- user_id, device_id,
- )
- _update_device_from_client_ips(device, ips)
- defer.returnValue(device)
-
- @defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -276,6 +402,12 @@ class DeviceHandler(BaseHandler):
user_id, device_ids, list(hosts)
)
+ for device_id in device_ids:
+ logger.debug(
+ "Notifying about update %r/%r, ID: %r", user_id, device_id,
+ position,
+ )
+
room_ids = yield self.store.get_rooms_for_user(user_id)
yield self.notifier.on_new_event(
@@ -283,130 +415,10 @@ class DeviceHandler(BaseHandler):
)
if hosts:
- logger.info("Sending device list update notif to: %r", hosts)
+ logger.info("Sending device list update notif for %r to: %r", user_id, hosts)
for host in hosts:
self.federation_sender.send_device_messages(host)
- @measure_func("device.get_user_ids_changed")
- @defer.inlineCallbacks
- def get_user_ids_changed(self, user_id, from_token):
- """Get list of users that have had the devices updated, or have newly
- joined a room, that `user_id` may be interested in.
-
- Args:
- user_id (str)
- from_token (StreamToken)
- """
- now_token = yield self.hs.get_event_sources().get_current_token()
-
- room_ids = yield self.store.get_rooms_for_user(user_id)
-
- # First we check if any devices have changed
- changed = yield self.store.get_user_whose_devices_changed(
- from_token.device_list_key
- )
-
- # Then work out if any users have since joined
- rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
-
- member_events = yield self.store.get_membership_changes_for_user(
- user_id, from_token.room_key, now_token.room_key
- )
- rooms_changed.update(event.room_id for event in member_events)
-
- stream_ordering = RoomStreamToken.parse_stream_token(
- from_token.room_key
- ).stream
-
- possibly_changed = set(changed)
- possibly_left = set()
- for room_id in rooms_changed:
- current_state_ids = yield self.store.get_current_state_ids(room_id)
-
- # The user may have left the room
- # TODO: Check if they actually did or if we were just invited.
- if room_id not in room_ids:
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- possibly_left.add(state_key)
- continue
-
- # Fetch the current state at the time.
- try:
- event_ids = yield self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering=stream_ordering
- )
- except errors.StoreError:
- # we have purged the stream_ordering index since the stream
- # ordering: treat it the same as a new room
- event_ids = []
-
- # special-case for an empty prev state: include all members
- # in the changed list
- if not event_ids:
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- continue
-
- current_member_id = current_state_ids.get((EventTypes.Member, user_id))
- if not current_member_id:
- continue
-
- # mapping from event_id -> state_dict
- prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
-
- # Check if we've joined the room? If so we just blindly add all the users to
- # the "possibly changed" users.
- for state_dict in itervalues(prev_state_ids):
- member_event = state_dict.get((EventTypes.Member, user_id), None)
- if not member_event or member_event != current_member_id:
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- break
-
- # If there has been any change in membership, include them in the
- # possibly changed list. We'll check if they are joined below,
- # and we're not toooo worried about spuriously adding users.
- for key, event_id in iteritems(current_state_ids):
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
-
- # check if this member has changed since any of the extremities
- # at the stream_ordering, and add them to the list if so.
- for state_dict in itervalues(prev_state_ids):
- prev_event_id = state_dict.get(key, None)
- if not prev_event_id or prev_event_id != event_id:
- if state_key != user_id:
- possibly_changed.add(state_key)
- break
-
- if possibly_changed or possibly_left:
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
-
- # Take the intersection of the users whose devices may have changed
- # and those that actually still share a room with the user
- possibly_joined = possibly_changed & users_who_share_room
- possibly_left = (possibly_changed | possibly_left) - users_who_share_room
- else:
- possibly_joined = []
- possibly_left = []
-
- defer.returnValue({
- "changed": list(possibly_joined),
- "left": list(possibly_left),
- })
-
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
@@ -473,15 +485,26 @@ class DeviceListEduUpdater(object):
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
- logger.warning("Got device list update edu for %r from %r", user_id, origin)
+ logger.warning(
+ "Got device list update edu for %r/%r from %r",
+ user_id, device_id, origin,
+ )
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
+ logger.warning(
+ "Got device list update edu for %r/%r, but don't share a room",
+ user_id, device_id,
+ )
return
+ logger.debug(
+ "Received device list update for %r/%r", user_id, device_id,
+ )
+
self._pending_updates.setdefault(user_id, []).append(
(device_id, stream_id, prev_ids, edu_content)
)
@@ -499,10 +522,18 @@ class DeviceListEduUpdater(object):
# This can happen since we batch updates
return
+ for device_id, stream_id, prev_ids, content in pending_updates:
+ logger.debug(
+ "Handling update %r/%r, ID: %r, prev: %r ",
+ user_id, device_id, stream_id, prev_ids,
+ )
+
# Given a list of updates we check if we need to resync. This
# happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates)
+ logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
+
if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
@@ -555,11 +586,21 @@ class DeviceListEduUpdater(object):
)
devices = []
+ for device in devices:
+ logger.debug(
+ "Handling resync update %r/%r, ID: %r",
+ user_id, device["device_id"], stream_id,
+ )
+
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
+
+ # We clobber the seen updates since we've re-synced from a given
+ # point.
+ self._seen_updates[user_id] = set([stream_id])
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
@@ -572,9 +613,9 @@ class DeviceListEduUpdater(object):
user_id, [device_id for device_id, _, _, _ in pending_updates]
)
- self._seen_updates.setdefault(user_id, set()).update(
- stream_id for _, stream_id, _, _ in pending_updates
- )
+ self._seen_updates.setdefault(user_id, set()).update(
+ stream_id for _, stream_id, _, _ in pending_updates
+ )
@defer.inlineCallbacks
def _need_to_do_resync(self, user_id, updates):
@@ -587,6 +628,11 @@ class DeviceListEduUpdater(object):
user_id
)
+ logger.debug(
+ "Current extremity for %r: %r",
+ user_id, extremity,
+ )
+
stream_id_in_updates = set() # stream_ids in updates list
for _, stream_id, prev_ids, _ in updates:
if not prev_ids:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8b113307d2..fe128d9c88 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.config = hs.config
+ self.enable_room_list_search = hs.config.enable_room_list_search
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
+ if visibility == "public" and not self.enable_room_list_search:
+ # The room list has been disabled.
+ raise AuthError(
+ 403,
+ "This user is not permitted to publish rooms to the room list"
+ )
+
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f772e62c28..d883e98381 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,7 +19,7 @@ import random
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError
+from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID
@@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
+ if room_id:
+ blocked = yield self.store.is_room_blocked(room_id)
+ if blocked:
+ raise SynapseError(403, "This room has been blocked on this server")
+
# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(auth_user_id)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f80486102a..9eaf2d3e18 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.crypto.event_signing import compute_event_signature
+from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -858,6 +859,52 @@ class FederationHandler(BaseHandler):
logger.debug("Not backfilling as no extremeties found.")
return
+ # We only want to paginate if we can actually see the events we'll get,
+ # as otherwise we'll just spend a lot of resources to get redacted
+ # events.
+ #
+ # We do this by filtering all the backwards extremities and seeing if
+ # any remain. Given we don't have the extremity events themselves, we
+ # need to actually check the events that reference them.
+ #
+ # *Note*: the spec wants us to keep backfilling until we reach the start
+ # of the room in case we are allowed to see some of the history. However
+ # in practice that causes more issues than its worth, as a) its
+ # relatively rare for there to be any visible history and b) even when
+ # there is its often sufficiently long ago that clients would stop
+ # attempting to paginate before backfill reached the visible history.
+ #
+ # TODO: If we do do a backfill then we should filter the backwards
+ # extremities to only include those that point to visible portions of
+ # history.
+ #
+ # TODO: Correctly handle the case where we are allowed to see the
+ # forward event but not the backward extremity, e.g. in the case of
+ # initial join of the server where we are allowed to see the join
+ # event but not anything before it. This would require looking at the
+ # state *before* the event, ignoring the special casing certain event
+ # types have.
+
+ forward_events = yield self.store.get_successor_events(
+ list(extremities),
+ )
+
+ extremities_events = yield self.store.get_events(
+ forward_events,
+ check_redacted=False,
+ get_prev_content=False,
+ )
+
+ # We set `check_history_visibility_only` as we might otherwise get false
+ # positives from users having been erased.
+ filtered_extremities = yield filter_events_for_server(
+ self.store, self.server_name, list(extremities_events.values()),
+ redact=False, check_history_visibility_only=True,
+ )
+
+ if not filtered_extremities:
+ defer.returnValue(False)
+
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(
extremities.items(),
@@ -1582,6 +1629,7 @@ class FederationHandler(BaseHandler):
origin, event,
state=state,
auth_events=auth_events,
+ backfilled=backfilled,
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
@@ -1626,6 +1674,7 @@ class FederationHandler(BaseHandler):
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
+ backfilled=backfilled,
)
defer.returnValue(res)
@@ -1748,7 +1797,7 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _prep_event(self, origin, event, state=None, auth_events=None):
+ def _prep_event(self, origin, event, state, auth_events, backfilled):
"""
Args:
@@ -1756,6 +1805,7 @@ class FederationHandler(BaseHandler):
event:
state:
auth_events:
+ backfilled (bool)
Returns:
Deferred, which resolves to synapse.events.snapshot.EventContext
@@ -1797,12 +1847,100 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
+ if not context.rejected:
+ yield self._check_for_soft_fail(event, state, backfilled)
+
if event.type == EventTypes.GuestAccess and not context.rejected:
yield self.maybe_kick_guest_users(event)
defer.returnValue(context)
@defer.inlineCallbacks
+ def _check_for_soft_fail(self, event, state, backfilled):
+ """Checks if we should soft fail the event, if so marks the event as
+ such.
+
+ Args:
+ event (FrozenEvent)
+ state (dict|None): The state at the event if we don't have all the
+ event's prev events
+ backfilled (bool): Whether the event is from backfill
+
+ Returns:
+ Deferred
+ """
+ # For new (non-backfilled and non-outlier) events we check if the event
+ # passes auth based on the current state. If it doesn't then we
+ # "soft-fail" the event.
+ do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
+ if do_soft_fail_check:
+ extrem_ids = yield self.store.get_latest_event_ids_in_room(
+ event.room_id,
+ )
+
+ extrem_ids = set(extrem_ids)
+ prev_event_ids = set(event.prev_event_ids())
+
+ if extrem_ids == prev_event_ids:
+ # If they're the same then the current state is the same as the
+ # state at the event, so no point rechecking auth for soft fail.
+ do_soft_fail_check = False
+
+ if do_soft_fail_check:
+ room_version = yield self.store.get_room_version(event.room_id)
+
+ # Calculate the "current state".
+ if state is not None:
+ # If we're explicitly given the state then we won't have all the
+ # prev events, and so we have a gap in the graph. In this case
+ # we want to be a little careful as we might have been down for
+ # a while and have an incorrect view of the current state,
+ # however we still want to do checks as gaps are easy to
+ # maliciously manufacture.
+ #
+ # So we use a "current state" that is actually a state
+ # resolution across the current forward extremities and the
+ # given state at the event. This should correctly handle cases
+ # like bans, especially with state res v2.
+
+ state_sets = yield self.store.get_state_groups(
+ event.room_id, extrem_ids,
+ )
+ state_sets = list(state_sets.values())
+ state_sets.append(state)
+ current_state_ids = yield self.state_handler.resolve_events(
+ room_version, state_sets, event,
+ )
+ current_state_ids = {
+ k: e.event_id for k, e in iteritems(current_state_ids)
+ }
+ else:
+ current_state_ids = yield self.state_handler.get_current_state_ids(
+ event.room_id, latest_event_ids=extrem_ids,
+ )
+
+ # Now check if event pass auth against said current state
+ auth_types = auth_types_for_event(event)
+ current_state_ids = [
+ e for k, e in iteritems(current_state_ids)
+ if k in auth_types
+ ]
+
+ current_auth_events = yield self.store.get_events(current_state_ids)
+ current_auth_events = {
+ (e.type, e.state_key): e for e in current_auth_events.values()
+ }
+
+ try:
+ self.auth.check(room_version, event, auth_events=current_auth_events)
+ except AuthError as e:
+ logger.warn(
+ "Failed current state auth resolution for %r because %s",
+ event, e,
+ )
+ event.internal_metadata.soft_failed = True
+
+ @defer.inlineCallbacks
def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
missing):
in_room = yield self.auth.check_host_in_room(
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 563bb3cea3..7dfae78db0 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,7 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes
+from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
@@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler):
A JSON serialisable dict with the snapshot of the room.
"""
+ blocked = yield self.store.is_room_blocked(room_id)
+ if blocked:
+ raise SynapseError(403, "This room has been blocked on this server")
+
user_id = requester.user.to_string()
membership, member_event_id = yield self._check_in_room_or_world_readable(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..9b41c7b205 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,12 +243,19 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
- if self.config.block_events_without_consent_error is not None:
+ self._block_events_without_consent_error = (
+ self.config.block_events_without_consent_error
+ )
+
+ # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+ # config options, but *only* if we have a configuration for which we are
+ # going to need it.
+ if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
- prev_events_and_hashes=None):
+ prev_events_and_hashes=None, require_consent=True):
"""
Given a dict from a client, create a new event.
@@ -269,6 +276,9 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
+
+ require_consent (bool): Whether to check if the requester has
+ consented to privacy policy.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
@@ -310,7 +320,7 @@ class EventCreationHandler(object):
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
- if not is_exempt:
+ if require_consent and not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
if token_id is not None:
@@ -378,7 +388,7 @@ class EventCreationHandler(object):
Raises:
ConsentNotGivenError: if the user has not given consent yet
"""
- if self.config.block_events_without_consent_error is None:
+ if self._block_events_without_consent_error is None:
return
# exempt AS users from needing consent
@@ -405,7 +415,7 @@ class EventCreationHandler(object):
consent_uri = self._consent_uri_builder.build_user_consent_uri(
requester.user.localpart,
)
- msg = self.config.block_events_without_consent_error % {
+ msg = self._block_events_without_consent_error % {
'consent_uri': consent_uri,
}
raise ConsentNotGivenError(
@@ -436,10 +446,11 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
- logger.info(
- "Not bothering to persist duplicate state event %s", event.event_id,
- )
if prev_state is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id, prev_state.event_id,
+ )
defer.returnValue(prev_state)
yield self.handle_new_client_event(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index ba3856674d..37e87fc054 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -816,7 +816,7 @@ class PresenceHandler(object):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
- yield self.federation.send_edu(
+ yield self.federation.build_and_send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
content={
@@ -836,7 +836,7 @@ class PresenceHandler(object):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
- self.federation.send_edu(
+ self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",
content={
@@ -848,7 +848,7 @@ class PresenceHandler(object):
state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
- self.federation.send_edu(
+ self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence",
content={
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 1dfbde84fd..a65c98ff5c 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -147,8 +147,14 @@ class BaseProfileHandler(BaseHandler):
@defer.inlineCallbacks
def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
- """target_user is the user whose displayname is to be changed;
- auth_user is the user attempting to make this change."""
+ """Set the displayname of a user
+
+ Args:
+ target_user (UserID): the user whose displayname is to be changed.
+ requester (Requester): The user attempting to make this change.
+ new_displayname (str): The displayname to give this user.
+ by_admin (bool): Whether this change was made by an administrator.
+ """
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 696469732c..274d2946ad 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,10 +16,8 @@ import logging
from twisted.internet import defer
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
@@ -39,42 +37,17 @@ class ReceiptsHandler(BaseHandler):
self.state = hs.get_state_handler()
@defer.inlineCallbacks
- def received_client_receipt(self, room_id, receipt_type, user_id,
- event_id):
- """Called when a client tells us a local user has read up to the given
- event_id in the room.
- """
- receipt = {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": [event_id],
- "data": {
- "ts": int(self.clock.time_msec()),
- }
- }
-
- is_new = yield self._handle_new_receipts([receipt])
-
- if is_new:
- # fire off a process in the background to send the receipt to
- # remote servers
- run_as_background_process(
- 'push_receipts_to_remotes', self._push_remotes, receipt
- )
-
- @defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = [
- {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": user_values["event_ids"],
- "data": user_values.get("data", {}),
- }
+ ReadReceipt(
+ room_id=room_id,
+ receipt_type=receipt_type,
+ user_id=user_id,
+ event_ids=user_values["event_ids"],
+ data=user_values.get("data", {}),
+ )
for room_id, room_values in content.items()
for receipt_type, users in room_values.items()
for user_id, user_values in users.items()
@@ -90,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
max_batch_id = None
for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
res = yield self.store.insert_receipt(
- room_id, receipt_type, user_id, event_ids, data
+ receipt.room_id,
+ receipt.receipt_type,
+ receipt.user_id,
+ receipt.event_ids,
+ receipt.data,
)
if not res:
@@ -115,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
- affected_room_ids = list(set([r["room_id"] for r in receipts]))
+ affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
@@ -128,43 +99,26 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
@defer.inlineCallbacks
- def _push_remotes(self, receipt):
- """Given a receipt, works out which remote servers should be
- poked and pokes them.
+ def received_client_receipt(self, room_id, receipt_type, user_id,
+ event_id):
+ """Called when a client tells us a local user has read up to the given
+ event_id in the room.
"""
- try:
- # TODO: optimise this to move some of the work to the workers.
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
- }
- }
- },
- },
- key=(room_id, receipt_type, user_id),
- )
- except Exception:
- logger.exception("Error pushing receipts to remote servers")
+ receipt = ReadReceipt(
+ room_id=room_id,
+ receipt_type=receipt_type,
+ user_id=user_id,
+ event_ids=[event_id],
+ data={
+ "ts": int(self.clock.time_msec()),
+ },
+ )
+
+ is_new = yield self._handle_new_receipts([receipt])
+ if not is_new:
+ return
+
+ yield self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c0e06929bd..58940e0320 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,9 @@ from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
Codes,
+ ConsentNotGivenError,
InvalidCaptchaError,
+ LimitExceededError,
RegistrationError,
SynapseError,
)
@@ -60,6 +62,7 @@ class RegistrationHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self.identity_handler = self.hs.get_handlers().identity_handler
+ self.ratelimiter = hs.get_registration_ratelimiter()
self._next_generated_user_id = None
@@ -149,6 +152,7 @@ class RegistrationHandler(BaseHandler):
threepid=None,
user_type=None,
default_display_name=None,
+ address=None,
):
"""Registers a new client on the server.
@@ -167,6 +171,7 @@ class RegistrationHandler(BaseHandler):
api.constants.UserTypes, or None for a normal user.
default_display_name (unicode|None): if set, the new user's displayname
will be set to this. Defaults to 'localpart'.
+ address (str|None): the IP address used to perform the registration.
Returns:
A tuple of (user_id, access_token).
Raises:
@@ -206,7 +211,7 @@ class RegistrationHandler(BaseHandler):
token = None
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
@@ -215,6 +220,7 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=default_display_name,
admin=admin,
user_type=user_type,
+ address=address,
)
if self.hs.config.user_directory_search_all_users:
@@ -238,12 +244,13 @@ class RegistrationHandler(BaseHandler):
if default_display_name is None:
default_display_name = localpart
try:
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
make_guest=make_guest,
create_profile_with_displayname=default_display_name,
+ address=address,
)
except SynapseError:
# if user id is taken, just generate another
@@ -305,6 +312,10 @@ class RegistrationHandler(BaseHandler):
)
else:
yield self._join_user_to_room(fake_requester, r)
+ except ConsentNotGivenError as e:
+ # Technically not necessary to pull out this error though
+ # moving away from bare excepts is a good thing to do.
+ logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
@@ -337,7 +348,7 @@ class RegistrationHandler(BaseHandler):
user_id, allowed_appservice=service
)
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
password_hash="",
appservice_id=service_id,
@@ -513,7 +524,7 @@ class RegistrationHandler(BaseHandler):
token = self.macaroon_gen.generate_access_token(user_id)
if need_register:
- yield self._register_with_store(
+ yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
@@ -590,10 +601,10 @@ class RegistrationHandler(BaseHandler):
ratelimit=False,
)
- def _register_with_store(self, user_id, token=None, password_hash=None,
- was_guest=False, make_guest=False, appservice_id=None,
- create_profile_with_displayname=None, admin=False,
- user_type=None):
+ def register_with_store(self, user_id, token=None, password_hash=None,
+ was_guest=False, make_guest=False, appservice_id=None,
+ create_profile_with_displayname=None, admin=False,
+ user_type=None, address=None):
"""Register user in the datastore.
Args:
@@ -612,10 +623,26 @@ class RegistrationHandler(BaseHandler):
admin (boolean): is an admin user?
user_type (str|None): type of user. One of the values from
api.constants.UserTypes, or None for a normal user.
+ address (str|None): the IP address used to perform the registration.
Returns:
Deferred
"""
+ # Don't rate limit for app services
+ if appservice_id is None and address is not None:
+ time_now = self.clock.time()
+
+ allowed, time_allowed = self.ratelimiter.can_do_action(
+ address, time_now_s=time_now,
+ rate_hz=self.hs.config.rc_registration.per_second,
+ burst_count=self.hs.config.rc_registration.burst_count,
+ )
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now)),
+ )
+
if self.hs.config.worker_app:
return self._register_client(
user_id=user_id,
@@ -627,6 +654,7 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=create_profile_with_displayname,
admin=admin,
user_type=user_type,
+ address=address,
)
else:
return self.store.register(
@@ -693,9 +721,9 @@ class RegistrationHandler(BaseHandler):
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
bind_email (bool): Whether to bind the email with the identity
- server
+ server.
bind_msisdn (bool): Whether to bind the msisdn with the identity
- server
+ server.
"""
if self.hs.config.worker_app:
yield self._post_registration_client(
@@ -737,7 +765,7 @@ class RegistrationHandler(BaseHandler):
"""A user consented to the terms on registration
Args:
- user_id (str): The user ID that consented
+ user_id (str): The user ID that consented.
consent_version (str): version of the policy the user has
consented to.
"""
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index afa508d729..d6c9d56007 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
+ self.enable_room_list_search = hs.config.enable_room_list_search
self.response_cache = ResponseCache(hs, "room_list")
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
@@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
+ if not self.enable_room_list_search:
+ return defer.succeed({
+ "chunk": [],
+ "total_room_count_estimate": 0,
+ })
+
logger.info(
"Getting public room list: limit=%r, since=%r, search=%r, network=%r",
limit, since_token, bool(search_filter), network_tuple,
)
+
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler):
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
+ if not self.enable_room_list_search:
+ defer.returnValue({
+ "chunk": [],
+ "total_room_count_estimate": 0,
+ })
+
if search_filter:
# We currently don't support searching across federation, so we have
# to do it manually without pagination
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 190ea2c7b1..71ce5b54e5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -160,6 +160,7 @@ class RoomMemberHandler(object):
txn_id=None,
ratelimit=True,
content=None,
+ require_consent=True,
):
user_id = target.to_string()
@@ -185,6 +186,7 @@ class RoomMemberHandler(object):
token_id=requester.access_token_id,
txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes,
+ require_consent=require_consent,
)
# Check if this event matches the previous membership event for the user.
@@ -232,6 +234,10 @@ class RoomMemberHandler(object):
self.copy_room_tags_and_direct_to_room(
predecessor["room_id"], room_id, user_id,
)
+ # Move over old push rules
+ self.store.move_push_rules_from_room_to_room_for_user(
+ predecessor["room_id"], room_id, user_id,
+ )
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -301,6 +307,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
+ require_consent=True,
):
key = (room_id,)
@@ -315,6 +322,7 @@ class RoomMemberHandler(object):
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
+ require_consent=require_consent,
)
defer.returnValue(result)
@@ -331,6 +339,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
+ require_consent=True,
):
content_specified = bool(content)
if content is None:
@@ -512,6 +521,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
+ require_consent=require_consent,
)
defer.returnValue(res)
diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
new file mode 100644
index 0000000000..b268bbcb2c
--- /dev/null
+++ b/synapse/handlers/state_deltas.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasHandler(object):
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
+ """Given two events check if the `key_name` field in content changed
+ from not matching `public_value` to doing so.
+
+ For example, check if `history_visibility` (`key_name`) changed from
+ `shared` to `world_readable` (`public_value`).
+
+ Returns:
+ None if the field in the events either both match `public_value`
+ or if neither do, i.e. there has been no change.
+ True if it didnt match `public_value` but now does
+ False if it did match `public_value` but now doesn't
+ """
+ prev_event = None
+ event = None
+ if prev_event_id:
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+
+ if event_id:
+ event = yield self.store.get_event(event_id, allow_none=True)
+
+ if not event and not prev_event:
+ logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
+ defer.returnValue(None)
+
+ prev_value = None
+ value = None
+
+ if prev_event:
+ prev_value = prev_event.content.get(key_name)
+
+ if event:
+ value = event.content.get(key_name)
+
+ logger.debug("prev_value: %r -> value: %r", prev_value, value)
+
+ if value == public_value and prev_value != public_value:
+ defer.returnValue(True)
+ elif value != public_value and prev_value == public_value:
+ defer.returnValue(False)
+ else:
+ defer.returnValue(None)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..57bb996245 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Debug logger for https://github.com/matrix-org/synapse/issues/4422
+issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -962,6 +965,15 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_groups(sync_result_builder)
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ for joined_room in sync_result_builder.joined:
+ room_id = joined_room.room_id
+ if room_id in newly_joined_rooms:
+ issue4422_logger.debug(
+ "Sync result for newly joined room %s: %r",
+ room_id, joined_room,
+ )
+
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@@ -1425,6 +1437,17 @@ class SyncHandler(object):
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
+
+ # debug for #4422
+ if has_join:
+ prev_membership = None
+ if old_mem_ev:
+ prev_membership = old_mem_ev.membership
+ issue4422_logger.debug(
+ "Previous membership for room %s with join: %s (event %s)",
+ room_id, prev_membership, old_mem_ev_id,
+ )
+
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
@@ -1519,30 +1542,39 @@ class SyncHandler(object):
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
+ newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
- room_entries.append(RoomSyncResultBuilder(
+ entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
- newly_joined=room_id in newly_joined_rooms,
+ newly_joined=newly_joined,
full_state=False,
- since_token=None if room_id in newly_joined_rooms else since_token,
+ since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
- ))
+ )
else:
- room_entries.append(RoomSyncResultBuilder(
+ entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
- newly_joined=room_id in newly_joined_rooms,
+ newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
- ))
+ )
+
+ if newly_joined:
+ # debugging for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "RoomSyncResultBuilder events for newly joined room %s: %r",
+ room_id, entry.events,
+ )
+ room_entries.append(entry)
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
@@ -1663,6 +1695,13 @@ class SyncHandler(object):
newly_joined_room=newly_joined,
)
+ if newly_joined:
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "Timeline events after filtering in newly-joined room %s: %r",
+ room_id, batch,
+ )
+
# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
@@ -1894,15 +1933,34 @@ def _calculate_state(
class SyncResultBuilder(object):
- "Used to help build up a new SyncResult for a user"
+ """Used to help build up a new SyncResult for a user
+
+ Attributes:
+ sync_config (SyncConfig)
+ full_state (bool)
+ since_token (StreamToken)
+ now_token (StreamToken)
+ joined_room_ids (list[str])
+
+ # The following mirror the fields in a sync response
+ presence (list)
+ account_data (list)
+ joined (list[JoinedSyncResult])
+ invited (list[InvitedSyncResult])
+ archived (list[ArchivedSyncResult])
+ device (list)
+ groups (GroupsSyncResult|None)
+ to_device (list)
+ """
def __init__(self, sync_config, full_state, since_token, now_token,
joined_room_ids):
"""
Args:
- sync_config(SyncConfig)
- full_state(bool): The full_state flag as specified by user
- since_token(StreamToken): The token supplied by user, or None.
- now_token(StreamToken): The token to sync up to.
+ sync_config (SyncConfig)
+ full_state (bool): The full_state flag as specified by user
+ since_token (StreamToken): The token supplied by user, or None.
+ now_token (StreamToken): The token to sync up to.
+ joined_room_ids (list[str]): List of rooms the user is joined to
"""
self.sync_config = sync_config
self.full_state = full_state
@@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object):
Args:
room_id(str)
rtype(str): One of `"joined"` or `"archived"`
- events(list): List of events to include in the room, (more events
- may be added when generating result).
+ events(list[FrozenEvent]): List of events to include in the room
+ (more events may be added when generating result).
newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a61bbf9392..39df960c31 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -231,7 +231,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
- self.federation.send_edu(
+ self.federation.build_and_send_edu(
destination=domain,
edu_type="m.typing",
content={
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..b689979b4b 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,12 +15,13 @@
import logging
-from six import iteritems
+from six import iteritems, iterkeys
from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
@@ -29,7 +30,7 @@ from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-class UserDirectoryHandler(object):
+class UserDirectoryHandler(StateDeltasHandler):
"""Handles querying of and keeping updated the user_directory.
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
@@ -38,19 +39,11 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
-
- For each user in the directory we also store a room_id which is public and that the
- user is joined to. This allows us to ignore history_visibility and join_rules changes
- for that user in all other public rooms, as we know they'll still be in at least
- one public room.
"""
- INITIAL_ROOM_SLEEP_MS = 50
- INITIAL_ROOM_SLEEP_COUNT = 100
- INITIAL_ROOM_BATCH_SIZE = 100
- INITIAL_USER_SLEEP_MS = 10
-
def __init__(self, hs):
+ super(UserDirectoryHandler, self).__init__(hs)
+
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
@@ -59,15 +52,6 @@ class UserDirectoryHandler(object):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
-
- # When start up for the first time we need to populate the user_directory.
- # This is a set of user_id's we've inserted already
- self.initially_handled_users = set()
- self.initially_handled_users_in_public = set()
-
- self.initially_handled_users_share = set()
- self.initially_handled_users_share_private_room = set()
-
# The current position in the current_state_delta stream
self.pos = None
@@ -130,7 +114,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
- user_id, profile.display_name, profile.avatar_url, None
+ user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
@@ -140,7 +124,6 @@ class UserDirectoryHandler(object):
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
- yield self.store.remove_from_user_in_public_room(user_id)
@defer.inlineCallbacks
def _unsafe_process(self):
@@ -148,10 +131,9 @@ class UserDirectoryHandler(object):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
- # If still None then we need to do the initial fill of directory
+ # If still None then the initial background update hasn't happened yet
if self.pos is None:
- yield self._do_initial_spam()
- self.pos = yield self.store.get_user_directory_stream_pos()
+ defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
@@ -173,149 +155,6 @@ class UserDirectoryHandler(object):
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
- def _do_initial_spam(self):
- """Populates the user_directory from the current state of the DB, used
- when synapse first starts with user_directory support
- """
- new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
- # Delete any existing entries just in case there are any
- yield self.store.delete_all_from_user_dir()
-
- # We process by going through each existing room at a time.
- room_ids = yield self.store.get_all_rooms()
-
- logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
- num_processed_rooms = 0
-
- for room_id in room_ids:
- logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
- yield self._handle_initial_room(room_id)
- num_processed_rooms += 1
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- logger.info("Processed all rooms.")
-
- if self.search_all_users:
- num_processed_users = 0
- user_ids = yield self.store.get_all_local_users()
- logger.info(
- "Doing initial update of user directory. %d users", len(user_ids)
- )
- for user_id in user_ids:
- # We add profiles for all users even if they don't match the
- # include pattern, just in case we want to change it in future
- logger.info(
- "Handling user %d/%d", num_processed_users + 1, len(user_ids)
- )
- yield self._handle_local_user(user_id)
- num_processed_users += 1
- yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
- logger.info("Processed all users")
-
- self.initially_handled_users = None
- self.initially_handled_users_in_public = None
- self.initially_handled_users_share = None
- self.initially_handled_users_share_private_room = None
-
- yield self.store.update_user_directory_stream_pos(new_pos)
-
- @defer.inlineCallbacks
- def _handle_initial_room(self, room_id):
- """Called when we initially fill out user_directory one room at a time
- """
- is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
- if not is_in_room:
- return
-
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- room_id
- )
-
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- user_ids = set(users_with_profile)
- unhandled_users = user_ids - self.initially_handled_users
-
- yield self.store.add_profiles_to_user_dir(
- room_id,
- {user_id: users_with_profile[user_id] for user_id in unhandled_users},
- )
-
- self.initially_handled_users |= unhandled_users
-
- if is_public:
- yield self.store.add_users_to_public_room(
- room_id, user_ids=user_ids - self.initially_handled_users_in_public
- )
- self.initially_handled_users_in_public |= user_ids
-
- # We now go and figure out the new users who share rooms with user entries
- # We sleep aggressively here as otherwise it can starve resources.
- # We also batch up inserts/updates, but try to avoid too many at once.
- to_insert = set()
- to_update = set()
- count = 0
- for user_id in user_ids:
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- if not self.is_mine_id(user_id):
- count += 1
- continue
-
- if self.store.get_if_app_services_interested_in_user(user_id):
- count += 1
- continue
-
- for other_user_id in user_ids:
- if user_id == other_user_id:
- continue
-
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
- count += 1
-
- user_set = (user_id, other_user_id)
-
- if user_set in self.initially_handled_users_share_private_room:
- continue
-
- if user_set in self.initially_handled_users_share:
- if is_public:
- continue
- to_update.add(user_set)
- else:
- to_insert.add(user_set)
-
- if is_public:
- self.initially_handled_users_share.add(user_set)
- else:
- self.initially_handled_users_share_private_room.add(user_set)
-
- if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert
- )
- to_insert.clear()
-
- if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
- to_update.clear()
-
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
- to_insert.clear()
-
- if to_update:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
- to_update.clear()
-
- @defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
@@ -356,6 +195,7 @@ class UserDirectoryHandler(object):
user_ids = yield self.store.get_users_in_dir_due_to_room(
room_id
)
+
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
return
@@ -436,14 +276,20 @@ class UserDirectoryHandler(object):
# ignore the change
return
- if change:
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- for user_id, profile in iteritems(users_with_profile):
- yield self._handle_new_user(room_id, user_id, profile)
- else:
- users = yield self.store.get_users_in_public_due_to_room(room_id)
- for user_id in users:
- yield self._handle_remove_user(room_id, user_id)
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+ # Remove every user from the sharing tables for that room.
+ for user_id in iterkeys(users_with_profile):
+ yield self.store.remove_user_who_share_room(user_id, room_id)
+
+ # Then, re-add them to the tables.
+ # NOTE: this is not the most efficient method, as handle_new_user sets
+ # up local_user -> other_user and other_user_whos_local -> local_user,
+ # which when ran over an entire room, will result in the same values
+ # being added multiple times. The batching upserts shouldn't make this
+ # too bad, though.
+ for user_id, profile in iteritems(users_with_profile):
+ yield self._handle_new_user(room_id, user_id, profile)
@defer.inlineCallbacks
def _handle_local_user(self, user_id):
@@ -457,7 +303,9 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
- yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@@ -469,177 +317,68 @@ class UserDirectoryHandler(object):
"""
logger.debug("Adding new user to dir, %r", user_id)
- row = yield self.store.get_user_in_directory(user_id)
- if not row:
- yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
+ # Now we update users who share rooms with users.
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
if is_public:
- row = yield self.store.get_user_in_public_room(user_id)
- if not row:
- yield self.store.add_users_to_public_room(room_id, [user_id])
+ yield self.store.add_users_in_public_rooms(room_id, (user_id,))
else:
- logger.debug("Not adding new user to public dir, %r", user_id)
+ to_insert = set()
- # Now we update users who share rooms with users. We do this by getting
- # all the current users in the room and seeing which aren't already
- # marked in the database as sharing with `user_id`
+ # First, if they're our user then we need to update for every user
+ if self.is_mine_id(user_id):
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
-
- to_insert = set()
- to_update = set()
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ user_id
+ )
- is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+ # We don't care about appservice users.
+ if not is_appservice:
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- # First, if they're our user then we need to update for every user
- if self.is_mine_id(user_id) and not is_appservice:
- # Returns a map of other_user_id -> shared_private. We only need
- # to update mappings if for users that either don't share a room
- # already (aren't in the map) or, if the room is private, those that
- # only share a public room.
- user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
- user_id
- )
+ to_insert.add((user_id, other_user_id))
+ # Next we need to update for every local user in the room
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue
- shared_is_private = user_ids_shared.get(other_user_id)
- if shared_is_private is True:
- # We've already marked in the database they share a private room
- continue
- elif shared_is_private is False:
- # They already share a public room, so only update if this is
- # a private room
- if not is_public:
- to_update.add((user_id, other_user_id))
- elif shared_is_private is None:
- # This is the first time they both share a room
- to_insert.add((user_id, other_user_id))
-
- # Next we need to update for every local user in the room
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
-
- is_appservice = self.store.get_if_app_services_interested_in_user(
- other_user_id
- )
- if self.is_mine_id(other_user_id) and not is_appservice:
- shared_is_private = yield self.store.get_if_users_share_a_room(
- other_user_id, user_id
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ other_user_id
)
- if shared_is_private is True:
- # We've already marked in the database they share a private room
- continue
- elif shared_is_private is False:
- # They already share a public room, so only update if this is
- # a private room
- if not is_public:
- to_update.add((other_user_id, user_id))
- elif shared_is_private is None:
- # This is the first time they both share a room
+ if self.is_mine_id(other_user_id) and not is_appservice:
to_insert.add((other_user_id, user_id))
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
-
- if to_update:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
+ if to_insert:
+ yield self.store.add_users_who_share_private_room(room_id, to_insert)
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
- """Called when we might need to remove user to directory
+ """Called when we might need to remove user from directory
Args:
room_id (str): room_id that user left or stopped being public that
user_id (str)
"""
- logger.debug("Maybe removing user %r", user_id)
-
- row = yield self.store.get_user_in_directory(user_id)
- update_user_dir = row and row["room_id"] == room_id
-
- row = yield self.store.get_user_in_public_room(user_id)
- update_user_in_public = row and row["room_id"] == room_id
-
- if update_user_in_public or update_user_dir:
- # XXX: Make this faster?
- rooms = yield self.store.get_rooms_for_user(user_id)
- for j_room_id in rooms:
- if not update_user_in_public and not update_user_dir:
- break
-
- is_in_room = yield self.store.is_host_joined(
- j_room_id, self.server_name
- )
-
- if not is_in_room:
- continue
+ logger.debug("Removing user %r", user_id)
- if update_user_dir:
- update_user_dir = False
- yield self.store.update_user_in_user_dir(user_id, j_room_id)
+ # Remove user from sharing tables
+ yield self.store.remove_user_who_share_room(user_id, room_id)
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- j_room_id
- )
+ # Are they still in any rooms? If not, remove them entirely.
+ rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
- if update_user_in_public and is_public:
- yield self.store.update_user_in_public_user_list(user_id, j_room_id)
- update_user_in_public = False
-
- if update_user_dir:
+ if len(rooms_user_is_in) == 0:
yield self.store.remove_from_user_dir(user_id)
- elif update_user_in_public:
- yield self.store.remove_from_user_in_public_room(user_id)
-
- # Now handle users_who_share_rooms.
-
- # Get a list of user tuples that were in the DB due to this room and
- # users (this includes tuples where the other user matches `user_id`)
- user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
- user_id, room_id
- )
-
- for user_id, other_user_id in user_tuples:
- # For each user tuple get a list of rooms that they still share,
- # trying to find a private room, and update the entry in the DB
- rooms = yield self.store.get_rooms_in_common_for_users(
- user_id, other_user_id
- )
-
- # If they dont share a room anymore, remove the mapping
- if not rooms:
- yield self.store.remove_user_who_share_room(user_id, other_user_id)
- continue
-
- found_public_share = None
- for j_room_id in rooms:
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- j_room_id
- )
-
- if is_public:
- found_public_share = j_room_id
- else:
- found_public_share = None
- yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
- )
- break
-
- if found_public_share:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
- )
@defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
@@ -665,50 +404,4 @@ class UserDirectoryHandler(object):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
- yield self.store.update_profile_in_user_dir(
- user_id, new_name, new_avatar, room_id
- )
-
- @defer.inlineCallbacks
- def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
- """Given two events check if the `key_name` field in content changed
- from not matching `public_value` to doing so.
-
- For example, check if `history_visibility` (`key_name`) changed from
- `shared` to `world_readable` (`public_value`).
-
- Returns:
- None if the field in the events either both match `public_value`
- or if neither do, i.e. there has been no change.
- True if it didnt match `public_value` but now does
- False if it did match `public_value` but now doesn't
- """
- prev_event = None
- event = None
- if prev_event_id:
- prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
-
- if event_id:
- event = yield self.store.get_event(event_id, allow_none=True)
-
- if not event and not prev_event:
- logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
- defer.returnValue(None)
-
- prev_value = None
- value = None
-
- if prev_event:
- prev_value = prev_event.content.get(key_name)
-
- if event:
- value = event.content.get(key_name)
-
- logger.debug("prev_value: %r -> value: %r", prev_value, value)
-
- if value == public_value and prev_value != public_value:
- defer.returnValue(True)
- elif value != public_value and prev_value == public_value:
- defer.returnValue(False)
- else:
- defer.returnValue(None)
+ yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
|