diff options
author | Erik Johnston <erik@matrix.org> | 2017-07-06 10:36:25 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-07-06 10:36:25 +0100 |
commit | 42b50483be2b022735f8ae2107314d51e92e8471 (patch) | |
tree | 6b41c2096b105f04244ce80e840d0f998dbe226f /synapse/handlers | |
parent | Merge pull request #2282 from matrix-org/release-v0.21.1 (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-42b50483be2b022735f8ae2107314d51e92e8471.tar.xz |
Merge branch 'release-v0.22.0' of github.com:matrix-org/synapse v0.22.0
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/auth.py | 21 | ||||
-rw-r--r-- | synapse/handlers/device.py | 4 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 51 | ||||
-rw-r--r-- | synapse/handlers/message.py | 15 | ||||
-rw-r--r-- | synapse/handlers/room.py | 21 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 10 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 11 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 12 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 641 |
9 files changed, 748 insertions, 38 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index e7a1bb7246..b00446bec0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -21,6 +21,7 @@ from synapse.api.constants import LoginType from synapse.types import UserID from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor +from synapse.util.caches.expiringcache import ExpiringCache from twisted.web.client import PartialDownloadError @@ -52,7 +53,15 @@ class AuthHandler(BaseHandler): LoginType.DUMMY: self._check_dummy_auth, } self.bcrypt_rounds = hs.config.bcrypt_rounds - self.sessions = {} + + # This is not a cache per se, but a store of all current sessions that + # expire after N hours + self.sessions = ExpiringCache( + cache_name="register_sessions", + clock=hs.get_clock(), + expiry_ms=self.SESSION_EXPIRE_MS, + reset_expiry_on_get=True, + ) account_handler = _AccountHandler( hs, check_user_exists=self.check_user_exists @@ -617,16 +626,6 @@ class AuthHandler(BaseHandler): logger.debug("Saving session %s", session) session["last_used"] = self.hs.get_clock().time_msec() self.sessions[session["id"]] = session - self._prune_sessions() - - def _prune_sessions(self): - for sid, sess in self.sessions.items(): - last_used = 0 - if 'last_used' in sess: - last_used = sess['last_used'] - now = self.hs.get_clock().time_msec() - if last_used < now - AuthHandler.SESSION_EXPIRE_MS: - del self.sessions[sid] def hash(self, password): """Computes a secure hash of password. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 982cda3edf..ed60d494ff 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler): device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in device_map.keys()) + user_id, device_id=None ) devices = device_map.values() @@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler): except errors.StoreError: raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id),) + user_id, device_id, ) _update_device_from_client_ips(device, ips) defer.returnValue(device) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 52d97dfbf3..483cb8eac6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,7 +43,6 @@ from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination -from synapse.push.action_generator import ActionGenerator from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -75,6 +74,8 @@ class FederationHandler(BaseHandler): self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() + self.action_generator = hs.get_action_generator() + self.is_mine_id = hs.is_mine_id self.replication_layer.set_handler(self) @@ -832,7 +833,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_event_auth(self, event_id): - auth = yield self.store.get_auth_chain([event_id]) + event = yield self.store.get_event(event_id) + auth = yield self.store.get_auth_chain( + [auth_id for auth_id, _ in event.auth_events], + include_given=True + ) for event in auth: event.signatures.update( @@ -1047,9 +1052,7 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) state_ids = context.prev_state_ids.values() - auth_chain = yield self.store.get_auth_chain(set( - [event.event_id] + state_ids - )) + auth_chain = yield self.store.get_auth_chain(state_ids) state = yield self.store.get_events(context.prev_state_ids.values()) @@ -1066,6 +1069,24 @@ class FederationHandler(BaseHandler): """ event = pdu + is_blocked = yield self.store.is_room_blocked(event.room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + + membership = event.content.get("membership") + if event.type != EventTypes.Member or membership != Membership.INVITE: + raise SynapseError(400, "The event was not an m.room.member invite event") + + sender_domain = get_domain_from_id(event.sender) + if sender_domain != origin: + raise SynapseError(400, "The invite event was not from the server sending it") + + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + + if not self.is_mine_id(event.state_key): + raise SynapseError(400, "The invite event must be for this server") + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True @@ -1100,6 +1121,9 @@ class FederationHandler(BaseHandler): user_id, "leave" ) + # Mark as outlier as we don't have any state for this event; we're not + # even in the room. + event.internal_metadata.outlier = True event = self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for @@ -1271,7 +1295,7 @@ class FederationHandler(BaseHandler): for event in res: # We sign these again because there was a bug where we # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): + if self.is_mine_id(event.event_id): event.signatures.update( compute_event_signature( event, @@ -1344,7 +1368,7 @@ class FederationHandler(BaseHandler): ) if event: - if self.hs.is_mine_id(event.event_id): + if self.is_mine_id(event.event_id): # FIXME: This is a temporary work around where we occasionally # return events slightly differently than when they were # originally signed @@ -1389,8 +1413,7 @@ class FederationHandler(BaseHandler): ) if not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( + yield self.action_generator.handle_push_actions_for_event( event, context ) @@ -1599,7 +1622,11 @@ class FederationHandler(BaseHandler): pass # Now get the current auth_chain for the event. - local_auth_chain = yield self.store.get_auth_chain([event_id]) + event = yield self.store.get_event(event_id) + local_auth_chain = yield self.store.get_auth_chain( + [auth_id for auth_id, _ in event.auth_events], + include_given=True + ) # TODO: Check if we would now reject event_id. If so we need to tell # everyone. @@ -1792,7 +1819,9 @@ class FederationHandler(BaseHandler): auth_ids = yield self.auth.compute_auth_events( event, context.prev_state_ids ) - local_auth_chain = yield self.store.get_auth_chain(auth_ids) + local_auth_chain = yield self.store.get_auth_chain( + auth_ids, include_given=True + ) try: # 2. Get remote difference. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 196925edad..24c9ffdb20 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.push.action_generator import ActionGenerator from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) @@ -35,6 +34,7 @@ from canonicaljson import encode_canonical_json import logging import random +import ujson logger = logging.getLogger(__name__) @@ -54,6 +54,8 @@ class MessageHandler(BaseHandler): # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) + self.action_generator = hs.get_action_generator() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -497,6 +499,14 @@ class MessageHandler(BaseHandler): logger.warn("Denying new event %r because %s", event, err) raise err + # Ensure that we can round trip before trying to persist in db + try: + dump = ujson.dumps(event.content) + ujson.loads(dump) + except: + logger.exception("Failed to encode content: %r", event.content) + raise + yield self.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: @@ -590,8 +600,7 @@ class MessageHandler(BaseHandler): "Changing the room create event is forbidden", ) - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( + yield self.action_generator.handle_push_actions_for_event( event, context ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d2a0d6520a..5698d28088 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -61,7 +61,7 @@ class RoomCreationHandler(BaseHandler): } @defer.inlineCallbacks - def create_room(self, requester, config): + def create_room(self, requester, config, ratelimit=True): """ Creates a new room. Args: @@ -75,7 +75,8 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - yield self.ratelimit(requester) + if ratelimit: + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: @@ -167,6 +168,7 @@ class RoomCreationHandler(BaseHandler): initial_state=initial_state, creation_content=creation_content, room_alias=room_alias, + power_level_content_override=config.get("power_level_content_override", {}) ) if "name" in config: @@ -245,7 +247,8 @@ class RoomCreationHandler(BaseHandler): invite_list, initial_state, creation_content, - room_alias + room_alias, + power_level_content_override, ): def create(etype, content, **kwargs): e = { @@ -291,7 +294,15 @@ class RoomCreationHandler(BaseHandler): ratelimit=False, ) - if (EventTypes.PowerLevels, '') not in initial_state: + # We treat the power levels override specially as this needs to be one + # of the first events that get sent into a room. + pl_content = initial_state.pop((EventTypes.PowerLevels, ''), None) + if pl_content is not None: + yield send( + etype=EventTypes.PowerLevels, + content=pl_content, + ) + else: power_level_content = { "users": { creator_id: 100, @@ -316,6 +327,8 @@ class RoomCreationHandler(BaseHandler): for invitee in invite_list: power_level_content["users"][invitee] = 100 + power_level_content.update(power_level_content_override) + yield send( etype=EventTypes.PowerLevels, content=power_level_content, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1ca88517a2..b3f979b246 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler): if not remote_room_hosts: remote_room_hosts = [] + if effective_membership_state not in ("leave", "ban",): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, @@ -369,6 +374,11 @@ class RoomMemberHandler(BaseHandler): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") + if event.membership not in (Membership.LEAVE, Membership.BAN): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + yield message_handler.handle_new_client_event( requester, event, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c0205da1a9..91c6c6be3c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ "archived", # ArchivedSyncResult for each archived room. "to_device", # List of direct messages for the device. "device_lists", # List of user_ids whose devices have chanegd + "device_one_time_keys_count", # Dict of algorithm to count for one time keys + # for this device ])): __slots__ = [] @@ -550,6 +552,14 @@ class SyncHandler(object): sync_result_builder ) + device_id = sync_config.device_id + one_time_key_counts = {} + if device_id: + user_id = sync_config.user.to_string() + one_time_key_counts = yield self.store.count_e2e_one_time_keys( + user_id, device_id + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -558,6 +568,7 @@ class SyncHandler(object): archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, + device_one_time_keys_count=one_time_key_counts, next_batch=sync_result_builder.now_token, )) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b7818af5c..82dedbbc99 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -89,7 +89,7 @@ class TypingHandler(object): until = self._member_typing_until.get(member, None) if not until or until <= now: logger.info("Timing out typing for: %s", member.user_id) - preserve_fn(self._stopped_typing)(member) + self._stopped_typing(member) continue # Check if we need to resend a keep alive over federation for this @@ -147,7 +147,7 @@ class TypingHandler(object): # No point sending another notification defer.returnValue(None) - yield self._push_update( + self._push_update( member=member, typing=True, ) @@ -171,7 +171,7 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=target_user_id) - yield self._stopped_typing(member) + self._stopped_typing(member) @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -180,7 +180,6 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=user_id) yield self._stopped_typing(member) - @defer.inlineCallbacks def _stopped_typing(self, member): if member.user_id not in self._room_typing.get(member.room_id, set()): # No point @@ -189,16 +188,15 @@ class TypingHandler(object): self._member_typing_until.pop(member, None) self._member_last_federation_poke.pop(member, None) - yield self._push_update( + self._push_update( member=member, typing=False, ) - @defer.inlineCallbacks def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - yield self._push_remote(member, typing) + preserve_fn(self._push_remote)(member, typing) self._push_update_local( member=member, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py new file mode 100644 index 0000000000..2a49456bfc --- /dev/null +++ b/synapse/handlers/user_directory.py @@ -0,0 +1,641 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.storage.roommember import ProfileInfo +from synapse.util.metrics import Measure +from synapse.util.async import sleep + + +logger = logging.getLogger(__name__) + + +class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + + The user directory is filled with users who this server can see are joined to a + world_readable or publically joinable room. We keep a database table up to date + by streaming changes of the current state and recalculating whether users should + be in the directory or not when necessary. + + For each user in the directory we also store a room_id which is public and that the + user is joined to. This allows us to ignore history_visibility and join_rules changes + for that user in all other public rooms, as we know they'll still be in at least + one public room. + """ + + INITIAL_SLEEP_MS = 50 + INITIAL_SLEEP_COUNT = 100 + INITIAL_BATCH_SIZE = 100 + + def __init__(self, hs): + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.update_user_directory = hs.config.update_user_directory + + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already + self.initially_handled_users = set() + self.initially_handled_users_in_public = set() + + self.initially_handled_users_share = set() + self.initially_handled_users_share_private_room = set() + + # The current position in the current_state_delta stream + self.pos = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if self.update_user_directory: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory + self.clock.call_later(0, self.notify_new_event) + + def search_users(self, user_id, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": <bool>, # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": <user_id>, + "display_name": <display_name>, + "avatar_url": <avatar_url> + } + ] + } + """ + return self.store.search_user_dir(user_id, search_term, limit) + + @defer.inlineCallbacks + def notify_new_event(self): + """Called when there may be more deltas to process + """ + if not self.update_user_directory: + return + + if self._is_processing: + return + + self._is_processing = True + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = yield self.store.get_user_directory_stream_pos() + + # If still None then we need to do the initial fill of directory + if self.pos is None: + yield self._do_initial_spam() + self.pos = yield self.store.get_user_directory_stream_pos() + + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "user_dir_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + logger.info("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) + + self.pos = deltas[-1]["stream_id"] + yield self.store.update_user_directory_stream_pos(self.pos) + + @defer.inlineCallbacks + def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() + + # Delete any existing entries just in case there are any + yield self.store.delete_all_from_user_dir() + + # We process by going through each existing room at a time. + room_ids = yield self.store.get_all_rooms() + + logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) + num_processed_rooms = 1 + + for room_id in room_ids: + logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) + yield self._handle_intial_room(room_id) + num_processed_rooms += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + logger.info("Processed all rooms.") + + self.initially_handled_users = None + self.initially_handled_users_in_public = None + self.initially_handled_users_share = None + self.initially_handled_users_share_private_room = None + + yield self.store.update_user_directory_stream_pos(new_pos) + + @defer.inlineCallbacks + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + is_in_room = yield self.store.is_host_joined(room_id, self.server_name) + if not is_in_room: + return + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + user_ids = set(users_with_profile) + unhandled_users = user_ids - self.initially_handled_users + + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) + + self.initially_handled_users |= unhandled_users + + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=user_ids - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public |= user_ids + + # We now go and figure out the new users who share rooms with user entries + # We sleep aggressively here as otherwise it can starve resources. + # We also batch up inserts/updates, but try to avoid too many at once. + to_insert = set() + to_update = set() + count = 0 + for user_id in user_ids: + if count % self.INITIAL_SLEEP_COUNT == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + if not self.is_mine_id(user_id): + count += 1 + continue + + if self.store.get_if_app_services_interested_in_user(user_id): + count += 1 + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % self.INITIAL_SLEEP_COUNT == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + count += 1 + + user_set = (user_id, other_user_id) + + if user_set in self.initially_handled_users_share_private_room: + continue + + if user_set in self.initially_handled_users_share: + if is_public: + continue + to_update.add(user_set) + else: + to_insert.add(user_set) + + if is_public: + self.initially_handled_users_share.add(user_set) + else: + self.initially_handled_users_share_private_room.add(user_set) + + if len(to_insert) > self.INITIAL_BATCH_SIZE: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if len(to_update) > self.INITIAL_BATCH_SIZE: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room + if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): + yield self._handle_room_publicity_change( + room_id, prev_event_id, event_id, typ, + ) + elif typ == EventTypes.Member: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + + if change is None: + # Handle any profile changes + yield self._handle_profile_change( + state_key, room_id, prev_event_id, event_id, + ) + continue + + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.is_host_joined( + room_id, self.server_name, + ) + if not is_in_room: + logger.info("Server left room: %r", room_id) + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + else: + logger.debug("Server is still in room: %r", room_id) + + if change: # The user joined + event = yield self.store.get_event(event_id, allow_none=True) + profile = ProfileInfo( + avatar_url=event.content.get("avatar_url"), + display_name=event.content.get("displayname"), + ) + + yield self._handle_new_user(room_id, state_key, profile) + else: # The user left + yield self._handle_remove_user(room_id, state_key) + else: + logger.debug("Ignoring irrelevant type: %r", typ) + + @defer.inlineCallbacks + def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): + """Handle a room having potentially changed from/to world_readable/publically + joinable. + + Args: + room_id (str) + prev_event_id (str|None): The previous event before the state change + event_id (str|None): The new event after the state change + typ (str): Type of the event + """ + logger.debug("Handling change for %s: %s", typ, room_id) + + if typ == EventTypes.RoomHistoryVisibility: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="history_visibility", + public_value="world_readable", + ) + elif typ == EventTypes.JoinRules: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="join_rule", + public_value=JoinRules.PUBLIC, + ) + else: + raise Exception("Invalid event type") + # If change is None, no change. True => become world_readable/public, + # False => was world_readable/public + if change is None: + logger.debug("No change") + return + + # There's been a change to or from being world readable. + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + logger.debug("Change: %r, is_public: %r", change, is_public) + + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change + return + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change + return + + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + yield self._handle_new_user(room_id, user_id, profile) + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: + yield self._handle_remove_user(room_id, user_id) + + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + logger.debug("Adding user to dir, %r", user_id) + + row = yield self.store.get_user_in_directory(user_id) + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if is_public: + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) + else: + logger.debug("Not adding user to public dir, %r", user_id) + + # Now we update users who share rooms with users. We do this by getting + # all the current users in the room and seeing which aren't already + # marked in the database as sharing with `user_id` + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + to_insert = set() + to_update = set() + + is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id) and not is_appservice: + # Returns a map of other_user_id -> shared_private. We only need + # to update mappings if for users that either don't share a room + # already (aren't in the map) or, if the room is private, those that + # only share a public room. + user_ids_shared = yield self.store.get_users_who_share_room_from_dir( + user_id + ) + + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + shared_is_private = user_ids_shared.get(other_user_id) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((user_id, other_user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((user_id, other_user_id)) + + # Next we need to update for every local user in the room + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + is_appservice = self.store.get_if_app_services_interested_in_user( + other_user_id + ) + if self.is_mine_id(other_user_id) and not is_appservice: + shared_is_private = yield self.store.get_if_users_share_a_room( + other_user_id, user_id, + ) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((other_user_id, user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((other_user_id, user_id)) + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + + @defer.inlineCallbacks + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + logger.debug("Maybe removing user %r", user_id) + + row = yield self.store.get_user_in_directory(user_id) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if (update_user_in_public or update_user_dir): + # XXX: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + if (not update_user_in_public and not update_user_dir): + break + + is_in_room = yield self.store.is_host_joined( + j_room_id, self.server_name, + ) + + if not is_in_room: + continue + + if update_user_dir: + update_user_dir = False + yield self.store.update_user_in_user_dir(user_id, j_room_id) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if update_user_in_public and is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) + + # Now handle users_who_share_rooms. + + # Get a list of user tuples that were in the DB due to this room and + # users (this includes tuples where the other user matches `user_id`) + user_tuples = yield self.store.get_users_in_share_dir_with_room_id( + user_id, room_id, + ) + + for user_id, other_user_id in user_tuples: + # For each user tuple get a list of rooms that they still share, + # trying to find a private room, and update the entry in the DB + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) + + # If they dont share a room anymore, remove the mapping + if not rooms: + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) + continue + + found_public_share = None + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + found_public_share = j_room_id + else: + found_public_share = None + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + break + + if found_public_share: + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + + @defer.inlineCallbacks + def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): + """Check member event changes for any profile changes and update the + database if there are. + """ + if not prev_event_id or not event_id: + return + + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + event = yield self.store.get_event(event_id, allow_none=True) + + if not prev_event or not event: + return + + if event.membership != Membership.JOIN: + return + + prev_name = prev_event.content.get("displayname") + new_name = event.content.get("displayname") + + prev_avatar = prev_event.content.get("avatar_url") + new_avatar = event.content.get("avatar_url") + + if prev_name != new_name or prev_avatar != new_avatar: + yield self.store.update_profile_in_user_dir( + user_id, new_name, new_avatar, room_id, + ) + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` + or if neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + False if it did match `public_value` but now doesn't + """ + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) + defer.returnValue(None) + + prev_value = None + value = None + + if prev_event: + prev_value = prev_event.content.get(key_name) + + if event: + value = event.content.get(key_name) + + logger.debug("prev_value: %r -> value: %r", prev_value, value) + + if value == public_value and prev_value != public_value: + defer.returnValue(True) + elif value != public_value and prev_value == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) |