summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/federation_reader.py6
-rw-r--r--synapse/app/frontend_proxy.py6
-rw-r--r--synapse/config/room_directory.py170
-rw-r--r--synapse/federation/federation_server.py3
-rw-r--r--synapse/handlers/directory.py29
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room_list.py9
-rw-r--r--synapse/handlers/room_member.py44
-rw-r--r--synapse/handlers/sync.py5
-rw-r--r--synapse/handlers/user_directory.py109
-rw-r--r--synapse/push/httppusher.py7
-rw-r--r--synapse/replication/http/__init__.py4
-rw-r--r--synapse/replication/http/device.py64
-rw-r--r--synapse/replication/http/registration.py65
-rw-r--r--synapse/replication/slave/storage/client_ips.py2
-rw-r--r--synapse/replication/tcp/streams.py6
-rw-r--r--synapse/rest/client/v1/admin.py22
-rw-r--r--synapse/rest/client/v2_alpha/register.py45
-rw-r--r--synapse/storage/client_ips.py2
-rw-r--r--synapse/storage/registration.py11
-rw-r--r--synapse/storage/roommember.py11
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/state.py25
23 files changed, 506 insertions, 143 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py

index 6ee2b76dcd..27c73f9b50 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.urls import FEDERATION_PREFIX +from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -43,6 +43,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -97,6 +98,9 @@ class FederationReaderServer(HomeServer): ), }) + if name in ["keys", "federation"]: + resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index d5b954361d..5d450718c6 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py
@@ -39,8 +39,12 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns +from synapse.rest.client.v2_alpha.register import ( + register_servlets as register_registration_servlets, +) from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.storage.registration import RegistrationStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole @@ -141,6 +145,7 @@ class FrontendProxySlavedStore( SlavedClientIpStore, SlavedApplicationServiceStore, SlavedRegistrationStore, + RegistrationStore, BaseSlavedStore, ): pass @@ -161,6 +166,7 @@ class FrontendProxyServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) + register_registration_servlets(self, resource) # If presence is disabled, use the stub servlet that does # not allow sending presence diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 9da13ab11b..c8e0abbae7 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py
@@ -20,12 +20,37 @@ from ._base import Config, ConfigError class RoomDirectoryConfig(Config): def read_config(self, config): - alias_creation_rules = config["alias_creation_rules"] + alias_creation_rules = config.get("alias_creation_rules") - self._alias_creation_rules = [ - _AliasRule(rule) - for rule in alias_creation_rules - ] + if alias_creation_rules is not None: + self._alias_creation_rules = [ + _RoomDirectoryRule("alias_creation_rules", rule) + for rule in alias_creation_rules + ] + else: + self._alias_creation_rules = [ + _RoomDirectoryRule( + "alias_creation_rules", { + "action": "allow", + } + ) + ] + + room_list_publication_rules = config.get("room_list_publication_rules") + + if room_list_publication_rules is not None: + self._room_list_publication_rules = [ + _RoomDirectoryRule("room_list_publication_rules", rule) + for rule in room_list_publication_rules + ] + else: + self._room_list_publication_rules = [ + _RoomDirectoryRule( + "room_list_publication_rules", { + "action": "allow", + } + ) + ] def default_config(self, config_dir_path, server_name, **kwargs): return """ @@ -33,60 +58,138 @@ class RoomDirectoryConfig(Config): # on this server. # # The format of this option is a list of rules that contain globs that - # match against user_id and the new alias (fully qualified with server - # name). The action in the first rule that matches is taken, which can - # currently either be "allow" or "deny". + # match against user_id, room_id and the new alias (fully qualified with + # server name). The action in the first rule that matches is taken, + # which can currently either be "allow" or "deny". + # + # Missing user_id/room_id/alias fields default to "*". + # + # If no rules match the request is denied. An empty list means no one + # can create aliases. + # + # Options for the rules include: + # + # user_id: Matches against the creator of the alias + # alias: Matches against the alias being created + # room_id: Matches against the room ID the alias is being pointed at + # action: Whether to "allow" or "deny" the request if the rule matches + # + # The default is: + # + # alias_creation_rules: + # - user_id: "*" + # alias: "*" + # room_id: "*" + # action: allow + + # The `room_list_publication_rules` option controls who can publish and + # which rooms can be published in the public room list. + # + # The format of this option is the same as that for + # `alias_creation_rules`. # - # If no rules match the request is denied. - alias_creation_rules: - - user_id: "*" - alias: "*" - action: allow + # If the room has one or more aliases associated with it, only one of + # the aliases needs to match the alias rule. If there are no aliases + # then only rules with `alias: *` match. + # + # If no rules match the request is denied. An empty list means no one + # can publish rooms. + # + # Options for the rules include: + # + # user_id: Matches agaisnt the creator of the alias + # room_id: Matches against the room ID being published + # alias: Matches against any current local or canonical aliases + # associated with the room + # action: Whether to "allow" or "deny" the request if the rule matches + # + # The default is: + # + # room_list_publication_rules: + # - user_id: "*" + # alias: "*" + # room_id: "*" + # action: allow """ - def is_alias_creation_allowed(self, user_id, alias): + def is_alias_creation_allowed(self, user_id, room_id, alias): """Checks if the given user is allowed to create the given alias Args: user_id (str) + room_id (str) alias (str) Returns: boolean: True if user is allowed to crate the alias """ for rule in self._alias_creation_rules: - if rule.matches(user_id, alias): + if rule.matches(user_id, room_id, [alias]): + return rule.action == "allow" + + return False + + def is_publishing_room_allowed(self, user_id, room_id, aliases): + """Checks if the given user is allowed to publish the room + + Args: + user_id (str) + room_id (str) + aliases (list[str]): any local aliases associated with the room + + Returns: + boolean: True if user can publish room + """ + for rule in self._room_list_publication_rules: + if rule.matches(user_id, room_id, aliases): return rule.action == "allow" return False -class _AliasRule(object): - def __init__(self, rule): +class _RoomDirectoryRule(object): + """Helper class to test whether a room directory action is allowed, like + creating an alias or publishing a room. + """ + + def __init__(self, option_name, rule): + """ + Args: + option_name (str): Name of the config option this rule belongs to + rule (dict): The rule as specified in the config + """ + action = rule["action"] - user_id = rule["user_id"] - alias = rule["alias"] + user_id = rule.get("user_id", "*") + room_id = rule.get("room_id", "*") + alias = rule.get("alias", "*") if action in ("allow", "deny"): self.action = action else: raise ConfigError( - "alias_creation_rules rules can only have action of 'allow'" - " or 'deny'" + "%s rules can only have action of 'allow'" + " or 'deny'" % (option_name,) ) + self._alias_matches_all = alias == "*" + try: self._user_id_regex = glob_to_regex(user_id) self._alias_regex = glob_to_regex(alias) + self._room_id_regex = glob_to_regex(room_id) except Exception as e: raise ConfigError("Failed to parse glob into regex: %s", e) - def matches(self, user_id, alias): - """Tests if this rule matches the given user_id and alias. + def matches(self, user_id, room_id, aliases): + """Tests if this rule matches the given user_id, room_id and aliases. Args: user_id (str) - alias (str) + room_id (str) + aliases (list[str]): The associated aliases to the room. Will be a + single element for testing alias creation, and can be empty for + testing room publishing. Returns: boolean @@ -96,7 +199,22 @@ class _AliasRule(object): if not self._user_id_regex.match(user_id): return False - if not self._alias_regex.match(alias): + if not self._room_id_regex.match(room_id): return False - return True + # We only have alias checks left, so we can short circuit if the alias + # rule matches everything. + if self._alias_matches_all: + return True + + # If we are not given any aliases then this rule only matches if the + # alias glob matches all aliases, which we checked above. + if not aliases: + return False + + # Otherwise, we just need one alias to match + for alias in aliases: + if self._alias_regex.match(alias): + return True + + return False diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3da86d4ba6..c2541b62af 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -877,6 +877,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): def on_edu(self, edu_type, origin, content): """Overrides FederationHandlerRegistry """ + if edu_type == "m.presence": + return + handler = self.edu_handlers.get(edu_type) if handler: return super(ReplicationFederationHandlerRegistry, self).on_edu( diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 6bb254f899..8b113307d2 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py
@@ -112,7 +112,9 @@ class DirectoryHandler(BaseHandler): 403, "This user is not permitted to create this alias", ) - if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()): + if not self.config.is_alias_creation_allowed( + user_id, room_id, room_alias.to_string(), + ): # Lets just return a generic message, as there may be all sorts of # reasons why we said no. TODO: Allow configurable error messages # per alias creation rule? @@ -395,9 +397,9 @@ class DirectoryHandler(BaseHandler): room_id (str) visibility (str): "public" or "private" """ - if not self.spam_checker.user_may_publish_room( - requester.user.to_string(), room_id - ): + user_id = requester.user.to_string() + + if not self.spam_checker.user_may_publish_room(user_id, room_id): raise AuthError( 403, "This user is not permitted to publish rooms to the room list" @@ -415,7 +417,24 @@ class DirectoryHandler(BaseHandler): yield self.auth.check_can_change_room_list(room_id, requester.user) - yield self.store.set_room_is_public(room_id, visibility == "public") + making_public = visibility == "public" + if making_public: + room_aliases = yield self.store.get_aliases_for_room(room_id) + canonical_alias = yield self.store.get_canonical_alias_for_room(room_id) + if canonical_alias: + room_aliases.append(canonical_alias) + + if not self.config.is_publishing_room_allowed( + user_id, room_id, room_aliases, + ): + # Lets just return a generic message, as there may be all sorts of + # reasons why we said no. TODO: Allow configurable error messages + # per alias creation rule? + raise SynapseError( + 403, "Not allowed to publish room", + ) + + yield self.store.set_room_is_public(room_id, making_public) @defer.inlineCallbacks def edit_published_appservice_room_list(self, appservice_id, network_id, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..d14af54778 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -196,7 +196,7 @@ class MessageHandler(object): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 13e212d669..1ec2b734e9 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -44,9 +44,12 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs, "room_list") - self.remote_response_cache = ResponseCache(hs, "remote_room_list", - timeout_ms=30 * 1000) + self.response_cache = ResponseCache( + hs, "room_list", timeout_ms=10 * 60 * 1000, + ) + self.remote_response_cache = ResponseCache( + hs, "remote_room_list", timeout_ms=30 * 1000, + ) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2beffdf41e..42bf5eda1e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -66,6 +66,7 @@ class RoomMemberHandler(object): self.event_creation_handler = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Linearizer(max_count=10, name="member_as_limiter") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -304,18 +305,37 @@ class RoomMemberHandler(object): ): key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + then = self.clock.time_msec() + + with (yield self.member_limiter.queue(as_id)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + with (yield self.member_linearizer.queue(key)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + ) defer.returnValue(result) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..24fc3850ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -39,6 +39,7 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000 # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -208,7 +209,9 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS, + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..0dacd9e357 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -28,7 +28,6 @@ from synapse.util.metrics import Measure logger = logging.getLogger(__name__) - class UserDirectoryHandler(object): """Handles querying of and keeping updated the user_directory. @@ -130,7 +129,7 @@ class UserDirectoryHandler(object): # Support users are for diagnostics and should not appear in the user directory. if not is_support: yield self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url, None + user_id, profile.display_name, profile.avatar_url, None, ) @defer.inlineCallbacks @@ -166,9 +165,8 @@ class UserDirectoryHandler(object): self.pos = deltas[-1]["stream_id"] # Expose current event processing position to prometheus - synapse.metrics.event_processing_positions.labels("user_dir").set( - self.pos - ) + synapse.metrics.event_processing_positions.labels( + "user_dir").set(self.pos) yield self.store.update_user_directory_stream_pos(self.pos) @@ -192,25 +190,21 @@ class UserDirectoryHandler(object): logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) logger.info("Processed all rooms.") if self.search_all_users: num_processed_users = 0 user_ids = yield self.store.get_all_local_users() - logger.info( - "Doing initial update of user directory. %d users", len(user_ids) - ) + logger.info("Doing initial update of user directory. %d users", len(user_ids)) for user_id in user_ids: # We add profiles for all users even if they don't match the # include pattern, just in case we want to change it in future - logger.info( - "Handling user %d/%d", num_processed_users + 1, len(user_ids) - ) + logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) yield self._handle_local_user(user_id) num_processed_users += 1 - yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) logger.info("Processed all users") @@ -229,24 +223,24 @@ class UserDirectoryHandler(object): if not is_in_room: return - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) user_ids = set(users_with_profile) unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( - room_id, - {user_id: users_with_profile[user_id] for user_id in unhandled_users}, + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } ) self.initially_handled_users |= unhandled_users if is_public: yield self.store.add_users_to_public_room( - room_id, user_ids=user_ids - self.initially_handled_users_in_public + room_id, + user_ids=user_ids - self.initially_handled_users_in_public ) self.initially_handled_users_in_public |= user_ids @@ -258,7 +252,7 @@ class UserDirectoryHandler(object): count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) if not self.is_mine_id(user_id): count += 1 @@ -273,7 +267,7 @@ class UserDirectoryHandler(object): continue if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) count += 1 user_set = (user_id, other_user_id) @@ -295,23 +289,25 @@ class UserDirectoryHandler(object): if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert + room_id, not is_public, to_insert, ) to_insert.clear() if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update + room_id, not is_public, to_update, ) to_update.clear() if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) to_insert.clear() if to_update: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update + room_id, not is_public, to_update, ) to_update.clear() @@ -332,42 +328,21 @@ class UserDirectoryHandler(object): # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): yield self._handle_room_publicity_change( - room_id, prev_event_id, event_id, typ + room_id, prev_event_id, event_id, typ, ) elif typ == EventTypes.Member: change = yield self._get_key_change( - prev_event_id, - event_id, + prev_event_id, event_id, key_name="membership", public_value=Membership.JOIN, ) - if change is False: - # Need to check if the server left the room entirely, if so - # we might need to remove all the users in that room - is_in_room = yield self.store.is_host_joined( - room_id, self.server_name - ) - if not is_in_room: - logger.info("Server left room: %r", room_id) - # Fetch all the users that we marked as being in user - # directory due to being in the room and then check if - # need to remove those users or not - user_ids = yield self.store.get_users_in_dir_due_to_room( - room_id - ) - for user_id in user_ids: - yield self._handle_remove_user(room_id, user_id) - return - else: - logger.debug("Server is still in room: %r", room_id) - is_support = yield self.store.is_support_user(state_key) if not is_support: if change is None: # Handle any profile changes yield self._handle_profile_change( - state_key, room_id, prev_event_id, event_id + state_key, room_id, prev_event_id, event_id, ) continue @@ -399,15 +374,13 @@ class UserDirectoryHandler(object): if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( - prev_event_id, - event_id, + prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) elif typ == EventTypes.JoinRules: change = yield self._get_key_change( - prev_event_id, - event_id, + prev_event_id, event_id, key_name="join_rule", public_value=JoinRules.PUBLIC, ) @@ -532,7 +505,7 @@ class UserDirectoryHandler(object): ) if self.is_mine_id(other_user_id) and not is_appservice: shared_is_private = yield self.store.get_if_users_share_a_room( - other_user_id, user_id + other_user_id, user_id, ) if shared_is_private is True: # We've already marked in the database they share a private room @@ -547,11 +520,13 @@ class UserDirectoryHandler(object): to_insert.add((other_user_id, user_id)) if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) if to_update: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update + room_id, not is_public, to_update, ) @defer.inlineCallbacks @@ -570,15 +545,15 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if update_user_in_public or update_user_dir: + if (update_user_in_public or update_user_dir): # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: + if (not update_user_in_public and not update_user_dir): break is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name + j_room_id, self.server_name, ) if not is_in_room: @@ -606,19 +581,19 @@ class UserDirectoryHandler(object): # Get a list of user tuples that were in the DB due to this room and # users (this includes tuples where the other user matches `user_id`) user_tuples = yield self.store.get_users_in_share_dir_with_room_id( - user_id, room_id + user_id, room_id, ) for user_id, other_user_id in user_tuples: # For each user tuple get a list of rooms that they still share, # trying to find a private room, and update the entry in the DB - rooms = yield self.store.get_rooms_in_common_for_users( - user_id, other_user_id - ) + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) # If they dont share a room anymore, remove the mapping if not rooms: - yield self.store.remove_user_who_share_room(user_id, other_user_id) + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) continue found_public_share = None @@ -632,13 +607,13 @@ class UserDirectoryHandler(object): else: found_public_share = None yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] + room_id, not is_public, [(user_id, other_user_id)], ) break if found_public_share: yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] + room_id, not is_public, [(user_id, other_user_id)], ) @defer.inlineCallbacks @@ -666,7 +641,7 @@ class UserDirectoryHandler(object): if prev_name != new_name or prev_avatar != new_avatar: yield self.store.update_profile_in_user_dir( - user_id, new_name, new_avatar, room_id + user_id, new_name, new_avatar, room_id, ) @defer.inlineCallbacks diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 87fa7f006a..82ab18acae 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -332,7 +332,12 @@ class HttpPusher(object): if not notification_dict: defer.returnValue([]) try: - resp = yield self.http_client.post_json_get_json(self.url, notification_dict) + url = self.url.replace( + "https://matrix.org/_matrix/push/v1/notify", + "http://http-priv.matrix.org/_matrix/push/v1/notify", + ) + + resp = yield self.http_client.post_json_get_json(url, notification_dict) except Exception: logger.warn( "Failed to push event %s to %s", diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 19f214281e..dec63ae68d 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, membership, send_event +from synapse.replication.http import federation, membership, registration, send_event, device REPLICATION_PREFIX = "/_synapse/replication" @@ -28,3 +28,5 @@ class ReplicationRestResource(JsonResource): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) federation.register_servlets(hs, self) + registration.register_servlets(hs, self) + device.register_servlets(hs, self) diff --git a/synapse/replication/http/device.py b/synapse/replication/http/device.py new file mode 100644
index 0000000000..605de028a0 --- /dev/null +++ b/synapse/replication/http/device.py
@@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class CheckDeviceRegisteredServlet(ReplicationEndpoint): + """ + Check a device is registered. + + """ + + NAME = "device_check_registered" + PATH_ARGS = ("user_id") + + def __init__(self, hs): + super(CheckDeviceRegisteredServlet, self).__init__(hs) + self.device_handler = hs.get_device_handler() + + @staticmethod + def _serialize_payload(user_id, device_id, initial_display_name): + """ + """ + return { + "device_id": device_id, + "initial_display_name": initial_display_name, + } + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + device_id = content["device_id"] + initial_display_name = content["initial_display_name"] + + try: + device_id = yield self.device_handler.check_device_registered(user_id, device_id) + except Exception as e: + defer.returnValue((400, str(e))) + + defer.returnValue((200, {"device_id": device_id})) + + +def register_servlets(hs, http_server): + CheckDeviceRegisteredServlet(hs).register(http_server) diff --git a/synapse/replication/http/registration.py b/synapse/replication/http/registration.py new file mode 100644
index 0000000000..0f2f226ae1 --- /dev/null +++ b/synapse/replication/http/registration.py
@@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class RegistrationUserCacheInvalidationServlet(ReplicationEndpoint): + """ + Invalidate the caches that a registration usually invalidates. + + Request format: + + POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id + + {} + """ + + NAME = "reg_invalidate_user_caches" + PATH_ARGS = ("user_id",) + + def __init__(self, hs): + super(RegistrationUserCacheInvalidationServlet, self).__init__(hs) + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload(user_id, args): + """ + Args: + user_id (str) + """ + return {} + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + + def invalidate(txn): + self.store._invalidate_cache_and_stream( + txn, self.store.get_user_by_id, (user_id,) + ) + txn.call_after(self.store.is_guest.invalidate, (user_id,)) + + yield self.store.runInteraction("user_invalidate_caches", invalidate) + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + RegistrationUserCacheInvalidationServlet(hs).register(http_server) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60641f1a49..5b8521c770 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py
@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index c1e626be3f..d49973634e 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py
@@ -32,7 +32,7 @@ from twisted.internet import defer logger = logging.getLogger(__name__) -MAX_EVENTS_BEHIND = 10000 +MAX_EVENTS_BEHIND = 500000 EventStreamRow = namedtuple("EventStreamRow", ( @@ -265,8 +265,8 @@ class PresenceStream(Stream): store = hs.get_datastore() presence_handler = hs.get_presence_handler() - self.current_token = store.get_current_presence_token - self.update_function = presence_handler.get_all_presence_updates + self.current_token = lambda: 0 + self.update_function = lambda _a, _b: [] super(PresenceStream, self).__init__(hs) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 82433a2aa9..2e303264f6 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py
@@ -466,17 +466,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - yield self.event_creation_handler.create_and_send_nonmember_event( - room_creator_requester, - { - "type": "m.room.message", - "content": {"body": message, "msgtype": "m.text"}, - "room_id": new_room_id, - "sender": new_room_user_id, - }, - ratelimit=False, - ) - requester_user_id = requester.user.to_string() logger.info("Shutting down room %r", room_id) @@ -514,6 +503,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): kicked_users.append(user_id) + yield self.event_creation_handler.create_and_send_nonmember_event( + room_creator_requester, + { + "type": "m.room.message", + "content": {"body": message, "msgtype": "m.text"}, + "room_id": new_room_id, + "sender": new_room_user_id, + }, + ratelimit=False, + ) + aliases_for_room = yield self.store.get_aliases_for_room(room_id) yield self.store.update_aliases_for_room( diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 7f812b8209..91c0d5e981 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py
@@ -33,6 +33,10 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) +from synapse.replication.http.device import CheckDeviceRegisteredServlet +from synapse.replication.http.registration import ( + RegistrationUserCacheInvalidationServlet, +) from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.threepids import check_3pid_allowed @@ -190,9 +194,20 @@ class RegisterRestServlet(RestServlet): self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() - self.device_handler = hs.get_device_handler() self.macaroon_gen = hs.get_macaroon_generator() + if self.hs.config.worker_app: + + self._invalidate_caches_client = ( + RegistrationUserCacheInvalidationServlet.make_client(hs) + ) + self._device_check_registered_client = ( + CheckDeviceRegisteredServlet.make_client(hs) + ) + else: + self.device_handler = hs.get_device_handler() + + @interactive_auth_handler @defer.inlineCallbacks def on_POST(self, request): @@ -266,6 +281,9 @@ class RegisterRestServlet(RestServlet): # == Shared Secret Registration == (e.g. create new user scripts) if 'mac' in body: + if self.hs.config.worker_app: + raise SynapseError(403, "Not available at this endpoint") + # FIXME: Should we really be determining if this is shared secret # auth based purely on the 'mac' key? result = yield self._do_shared_secret_registration( @@ -456,6 +474,9 @@ class RegisterRestServlet(RestServlet): ) yield self.registration_handler.post_consent_actions(registered_user_id) + if self.hs.config.worker_app: + yield self._invalidate_caches_client(registered_user_id) + defer.returnValue((200, return_dict)) def on_OPTIONS(self, _): @@ -466,6 +487,10 @@ class RegisterRestServlet(RestServlet): user_id = yield self.registration_handler.appservice_register( username, as_token ) + + if self.hs.config.worker_app: + yield self._invalidate_caches_client(user_id) + defer.returnValue((yield self._create_registration_details(user_id, body))) @defer.inlineCallbacks @@ -647,6 +672,20 @@ class RegisterRestServlet(RestServlet): }) defer.returnValue(result) + @defer.inlineCallbacks + def _check_device_registered(self, user_id, device_id, initial_display_name): + + if self.hs.config.worker_app: + r = yield self._device_check_registered_client( + user_id, device_id, initial_display_name + ) + defer.returnValue(r["device_id"]) + else: + r = yield self.device_handler.check_device_registered( + user_id, device_id, initial_display_name + ) + defer.returnValue(r) + def _register_device(self, user_id, params): """Register a device for a user. @@ -663,7 +702,7 @@ class RegisterRestServlet(RestServlet): # register the user's device device_id = params.get("device_id") initial_display_name = params.get("initial_device_display_name") - return self.device_handler.check_device_registered( + return self._check_device_registered( user_id, device_id, initial_display_name ) @@ -680,7 +719,7 @@ class RegisterRestServlet(RestServlet): # we have nowhere to store it. device_id = synapse.api.auth.GUEST_DEVICE_ID initial_display_name = params.get("initial_device_display_name") - yield self.device_handler.check_device_registered( + yield self._check_device_registered( user_id, device_id, initial_display_name ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 9c21362226..1adfee8c0a 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c9e11c3135..8b4554f6af 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py
@@ -146,6 +146,7 @@ class RegistrationStore(RegistrationWorkerStore, def __init__(self, db_conn, hs): super(RegistrationStore, self).__init__(db_conn, hs) + self.hs = hs self.clock = hs.get_clock() self.register_background_index_update( @@ -321,10 +322,12 @@ class RegistrationStore(RegistrationWorkerStore, (user_id_obj.localpart, create_profile_with_displayname) ) - self._invalidate_cache_and_stream( - txn, self.get_user_by_id, (user_id,) - ) - txn.call_after(self.is_guest.invalidate, (user_id,)) + # Don't invalidate here, it will be done through replication to the worker. + if not self.hs.config.worker_app: + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) def get_users_by_id_case_insensitive(self, user_id): """Gets users that match user_id case insensitively. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 592c1bcd33..2874dabbd1 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -72,7 +72,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) - @cached(max_entries=100000, iterable=True) + @cachedInlineCallbacks(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): sql = ( @@ -86,7 +86,14 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN,)) return [to_ascii(r[0]) for r in txn] - return self.runInteraction("get_users_in_room", f) + start_time = self._clock.time_msec() + result = yield self.runInteraction("get_users_in_room", f) + end_time = self._clock.time_msec() + logger.info( + "Fetched room membership for %s (%i users) in %i ms", + room_id, len(result), end_time - start_time, + ) + defer.returnValue(result) @cached(max_entries=100000) def get_room_summary(self, room_id): diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6420b2374..ad01071318 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py
@@ -730,7 +730,7 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) + return " & ".join(result for result in results) elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) else: diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index d14a7b2538..6ddc4055d2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py
@@ -548,6 +548,31 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): _get_filtered_current_state_ids_txn, ) + @defer.inlineCallbacks + def get_canonical_alias_for_room(self, room_id): + """Get canonical alias for room, if any + + Args: + room_id (str) + + Returns: + Deferred[str|None]: The canonical alias, if any + """ + + state = yield self.get_filtered_current_state_ids(room_id, StateFilter.from_types( + [(EventTypes.CanonicalAlias, "")] + )) + + event_id = state.get((EventTypes.CanonicalAlias, "")) + if not event_id: + return + + event = yield self.get_event(event_id, allow_none=True) + if not event: + return + + defer.returnValue(event.content.get("canonical_alias")) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between