summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py21
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/federation.py28
-rw-r--r--synapse/handlers/message.py13
-rw-r--r--synapse/handlers/room.py21
-rw-r--r--synapse/handlers/room_member.py10
-rw-r--r--synapse/handlers/sync.py15
-rw-r--r--synapse/handlers/user_directory.py30
8 files changed, 103 insertions, 39 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py

index e7a1bb7246..b00446bec0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py
@@ -21,6 +21,7 @@ from synapse.api.constants import LoginType from synapse.types import UserID from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor +from synapse.util.caches.expiringcache import ExpiringCache from twisted.web.client import PartialDownloadError @@ -52,7 +53,15 @@ class AuthHandler(BaseHandler): LoginType.DUMMY: self._check_dummy_auth, } self.bcrypt_rounds = hs.config.bcrypt_rounds - self.sessions = {} + + # This is not a cache per se, but a store of all current sessions that + # expire after N hours + self.sessions = ExpiringCache( + cache_name="register_sessions", + clock=hs.get_clock(), + expiry_ms=self.SESSION_EXPIRE_MS, + reset_expiry_on_get=True, + ) account_handler = _AccountHandler( hs, check_user_exists=self.check_user_exists @@ -617,16 +626,6 @@ class AuthHandler(BaseHandler): logger.debug("Saving session %s", session) session["last_used"] = self.hs.get_clock().time_msec() self.sessions[session["id"]] = session - self._prune_sessions() - - def _prune_sessions(self): - for sid, sess in self.sessions.items(): - last_used = 0 - if 'last_used' in sess: - last_used = sess['last_used'] - now = self.hs.get_clock().time_msec() - if last_used < now - AuthHandler.SESSION_EXPIRE_MS: - del self.sessions[sid] def hash(self, password): """Computes a secure hash of password. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 982cda3edf..ed60d494ff 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler): device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in device_map.keys()) + user_id, device_id=None ) devices = device_map.values() @@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler): except errors.StoreError: raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id),) + user_id, device_id, ) _update_device_from_client_ips(device, ips) defer.returnValue(device) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 39d2bee8da..b790a7c2ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -75,6 +75,8 @@ class FederationHandler(BaseHandler): self.server_name = hs.hostname self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() + self.is_mine_id = hs.is_mine_id + self.pusher_pool = hs.get_pusherpool() self.replication_layer.set_handler(self) @@ -1068,6 +1070,24 @@ class FederationHandler(BaseHandler): """ event = pdu + is_blocked = yield self.store.is_room_blocked(event.room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + + membership = event.content.get("membership") + if event.type != EventTypes.Member or membership != Membership.INVITE: + raise SynapseError(400, "The event was not an m.room.member invite event") + + sender_domain = get_domain_from_id(event.sender) + if sender_domain != origin: + raise SynapseError(400, "The invite event was not from the server sending it") + + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + + if not self.is_mine_id(event.state_key): + raise SynapseError(400, "The invite event must be for this server") + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True @@ -1276,7 +1296,7 @@ class FederationHandler(BaseHandler): for event in res: # We sign these again because there was a bug where we # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): + if self.is_mine_id(event.event_id): event.signatures.update( compute_event_signature( event, @@ -1349,7 +1369,7 @@ class FederationHandler(BaseHandler): ) if event: - if self.hs.is_mine_id(event.event_id): + if self.is_mine_id(event.event_id): # FIXME: This is a temporary work around where we occasionally # return events slightly differently than when they were # originally signed @@ -1393,7 +1413,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not backfilled: yield self.action_generator.handle_push_actions_for_event( event, context ) @@ -1407,7 +1427,7 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + preserve_fn(self.pusher_pool.on_new_notifications)( event_stream_id, max_stream_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a04f634c5c..be4f123c54 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json import logging import random +import ujson logger = logging.getLogger(__name__) @@ -49,6 +50,8 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() + self.pusher_pool = hs.get_pusherpool() + # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) @@ -498,6 +501,14 @@ class MessageHandler(BaseHandler): logger.warn("Denying new event %r because %s", event, err) raise err + # Ensure that we can round trip before trying to persist in db + try: + dump = ujson.dumps(event.content) + ujson.loads(dump) + except: + logger.exception("Failed to encode content: %r", event.content) + raise + yield self.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: @@ -601,7 +612,7 @@ class MessageHandler(BaseHandler): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + preserve_fn(self.pusher_pool.on_new_notifications)( event_stream_id, max_stream_id ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d2a0d6520a..5698d28088 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -61,7 +61,7 @@ class RoomCreationHandler(BaseHandler): } @defer.inlineCallbacks - def create_room(self, requester, config): + def create_room(self, requester, config, ratelimit=True): """ Creates a new room. Args: @@ -75,7 +75,8 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - yield self.ratelimit(requester) + if ratelimit: + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: @@ -167,6 +168,7 @@ class RoomCreationHandler(BaseHandler): initial_state=initial_state, creation_content=creation_content, room_alias=room_alias, + power_level_content_override=config.get("power_level_content_override", {}) ) if "name" in config: @@ -245,7 +247,8 @@ class RoomCreationHandler(BaseHandler): invite_list, initial_state, creation_content, - room_alias + room_alias, + power_level_content_override, ): def create(etype, content, **kwargs): e = { @@ -291,7 +294,15 @@ class RoomCreationHandler(BaseHandler): ratelimit=False, ) - if (EventTypes.PowerLevels, '') not in initial_state: + # We treat the power levels override specially as this needs to be one + # of the first events that get sent into a room. + pl_content = initial_state.pop((EventTypes.PowerLevels, ''), None) + if pl_content is not None: + yield send( + etype=EventTypes.PowerLevels, + content=pl_content, + ) + else: power_level_content = { "users": { creator_id: 100, @@ -316,6 +327,8 @@ class RoomCreationHandler(BaseHandler): for invitee in invite_list: power_level_content["users"][invitee] = 100 + power_level_content.update(power_level_content_override) + yield send( etype=EventTypes.PowerLevels, content=power_level_content, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1ca88517a2..b3f979b246 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler): if not remote_room_hosts: remote_room_hosts = [] + if effective_membership_state not in ("leave", "ban",): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, @@ -369,6 +374,11 @@ class RoomMemberHandler(BaseHandler): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") + if event.membership not in (Membership.LEAVE, Membership.BAN): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + yield message_handler.handle_new_client_event( requester, event, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 91c6c6be3c..e6df1819b9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -579,18 +579,17 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - room_ids = yield self.store.get_rooms_for_user(user_id) - - user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - for other_user_id in changed: - other_room_ids = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(other_room_ids): - user_ids_changed.add(other_user_id) + if not changed: + defer.returnValue([]) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - defer.returnValue(user_ids_changed) + defer.returnValue(users_who_share_room & changed) else: defer.returnValue([]) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d33a20a1f2..2a49456bfc 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -42,6 +42,8 @@ class UserDirectoyHandler(object): """ INITIAL_SLEEP_MS = 50 + INITIAL_SLEEP_COUNT = 100 + INITIAL_BATCH_SIZE = 100 def __init__(self, hs): self.store = hs.get_datastore() @@ -126,6 +128,7 @@ class UserDirectoyHandler(object): if not deltas: return + logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos = deltas[-1]["stream_id"] @@ -187,9 +190,9 @@ class UserDirectoyHandler(object): if is_public: yield self.store.add_users_to_public_room( room_id, - user_ids=unhandled_users - self.initially_handled_users_in_public + user_ids=user_ids - self.initially_handled_users_in_public ) - self.initially_handled_users_in_public != unhandled_users + self.initially_handled_users_in_public |= user_ids # We now go and figure out the new users who share rooms with user entries # We sleep aggressively here as otherwise it can starve resources. @@ -198,18 +201,22 @@ class UserDirectoyHandler(object): to_update = set() count = 0 for user_id in user_ids: - if count % 100 == 0: + if count % self.INITIAL_SLEEP_COUNT == 0: yield sleep(self.INITIAL_SLEEP_MS / 1000.) if not self.is_mine_id(user_id): count += 1 continue + if self.store.get_if_app_services_interested_in_user(user_id): + count += 1 + continue + for other_user_id in user_ids: if user_id == other_user_id: continue - if count % 100 == 0: + if count % self.INITIAL_SLEEP_COUNT == 0: yield sleep(self.INITIAL_SLEEP_MS / 1000.) count += 1 @@ -230,13 +237,13 @@ class UserDirectoyHandler(object): else: self.initially_handled_users_share_private_room.add(user_set) - if len(to_insert) > 100: + if len(to_insert) > self.INITIAL_BATCH_SIZE: yield self.store.add_users_who_share_room( room_id, not is_public, to_insert, ) to_insert.clear() - if len(to_update) > 100: + if len(to_update) > self.INITIAL_BATCH_SIZE: yield self.store.update_users_who_share_room( room_id, not is_public, to_update, ) @@ -294,7 +301,7 @@ class UserDirectoyHandler(object): room_id, self.server_name, ) if not is_in_room: - logger.debug("Server left room: %r", room_id) + logger.info("Server left room: %r", room_id) # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not @@ -411,8 +418,10 @@ class UserDirectoyHandler(object): to_insert = set() to_update = set() + is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + # First, if they're our user then we need to update for every user - if self.is_mine_id(user_id): + if self.is_mine_id(user_id) and not is_appservice: # Returns a map of other_user_id -> shared_private. We only need # to update mappings if for users that either don't share a room # already (aren't in the map) or, if the room is private, those that @@ -443,7 +452,10 @@ class UserDirectoyHandler(object): if user_id == other_user_id: continue - if self.is_mine_id(other_user_id): + is_appservice = self.store.get_if_app_services_interested_in_user( + other_user_id + ) + if self.is_mine_id(other_user_id) and not is_appservice: shared_is_private = yield self.store.get_if_users_share_a_room( other_user_id, user_id, )