diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 6ee2b76dcd..27c73f9b50 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
-from synapse.api.urls import FEDERATION_PREFIX
+from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -43,6 +43,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -97,6 +98,9 @@ class FederationReaderServer(HomeServer):
),
})
+ if name in ["keys", "federation"]:
+ resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
+
root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp(
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index d5b954361d..5d450718c6 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -39,8 +39,12 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
+from synapse.rest.client.v2_alpha.register import (
+ register_servlets as register_registration_servlets,
+)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
+from synapse.storage.registration import RegistrationStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
@@ -141,6 +145,7 @@ class FrontendProxySlavedStore(
SlavedClientIpStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
+ RegistrationStore,
BaseSlavedStore,
):
pass
@@ -161,6 +166,7 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
+ register_registration_servlets(self, resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 9da13ab11b..c8e0abbae7 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -20,12 +20,37 @@ from ._base import Config, ConfigError
class RoomDirectoryConfig(Config):
def read_config(self, config):
- alias_creation_rules = config["alias_creation_rules"]
+ alias_creation_rules = config.get("alias_creation_rules")
- self._alias_creation_rules = [
- _AliasRule(rule)
- for rule in alias_creation_rules
- ]
+ if alias_creation_rules is not None:
+ self._alias_creation_rules = [
+ _RoomDirectoryRule("alias_creation_rules", rule)
+ for rule in alias_creation_rules
+ ]
+ else:
+ self._alias_creation_rules = [
+ _RoomDirectoryRule(
+ "alias_creation_rules", {
+ "action": "allow",
+ }
+ )
+ ]
+
+ room_list_publication_rules = config.get("room_list_publication_rules")
+
+ if room_list_publication_rules is not None:
+ self._room_list_publication_rules = [
+ _RoomDirectoryRule("room_list_publication_rules", rule)
+ for rule in room_list_publication_rules
+ ]
+ else:
+ self._room_list_publication_rules = [
+ _RoomDirectoryRule(
+ "room_list_publication_rules", {
+ "action": "allow",
+ }
+ )
+ ]
def default_config(self, config_dir_path, server_name, **kwargs):
return """
@@ -33,60 +58,138 @@ class RoomDirectoryConfig(Config):
# on this server.
#
# The format of this option is a list of rules that contain globs that
- # match against user_id and the new alias (fully qualified with server
- # name). The action in the first rule that matches is taken, which can
- # currently either be "allow" or "deny".
+ # match against user_id, room_id and the new alias (fully qualified with
+ # server name). The action in the first rule that matches is taken,
+ # which can currently either be "allow" or "deny".
+ #
+ # Missing user_id/room_id/alias fields default to "*".
+ #
+ # If no rules match the request is denied. An empty list means no one
+ # can create aliases.
+ #
+ # Options for the rules include:
+ #
+ # user_id: Matches against the creator of the alias
+ # alias: Matches against the alias being created
+ # room_id: Matches against the room ID the alias is being pointed at
+ # action: Whether to "allow" or "deny" the request if the rule matches
+ #
+ # The default is:
+ #
+ # alias_creation_rules:
+ # - user_id: "*"
+ # alias: "*"
+ # room_id: "*"
+ # action: allow
+
+ # The `room_list_publication_rules` option controls who can publish and
+ # which rooms can be published in the public room list.
+ #
+ # The format of this option is the same as that for
+ # `alias_creation_rules`.
#
- # If no rules match the request is denied.
- alias_creation_rules:
- - user_id: "*"
- alias: "*"
- action: allow
+ # If the room has one or more aliases associated with it, only one of
+ # the aliases needs to match the alias rule. If there are no aliases
+ # then only rules with `alias: *` match.
+ #
+ # If no rules match the request is denied. An empty list means no one
+ # can publish rooms.
+ #
+ # Options for the rules include:
+ #
+ # user_id: Matches agaisnt the creator of the alias
+ # room_id: Matches against the room ID being published
+ # alias: Matches against any current local or canonical aliases
+ # associated with the room
+ # action: Whether to "allow" or "deny" the request if the rule matches
+ #
+ # The default is:
+ #
+ # room_list_publication_rules:
+ # - user_id: "*"
+ # alias: "*"
+ # room_id: "*"
+ # action: allow
"""
- def is_alias_creation_allowed(self, user_id, alias):
+ def is_alias_creation_allowed(self, user_id, room_id, alias):
"""Checks if the given user is allowed to create the given alias
Args:
user_id (str)
+ room_id (str)
alias (str)
Returns:
boolean: True if user is allowed to crate the alias
"""
for rule in self._alias_creation_rules:
- if rule.matches(user_id, alias):
+ if rule.matches(user_id, room_id, [alias]):
+ return rule.action == "allow"
+
+ return False
+
+ def is_publishing_room_allowed(self, user_id, room_id, aliases):
+ """Checks if the given user is allowed to publish the room
+
+ Args:
+ user_id (str)
+ room_id (str)
+ aliases (list[str]): any local aliases associated with the room
+
+ Returns:
+ boolean: True if user can publish room
+ """
+ for rule in self._room_list_publication_rules:
+ if rule.matches(user_id, room_id, aliases):
return rule.action == "allow"
return False
-class _AliasRule(object):
- def __init__(self, rule):
+class _RoomDirectoryRule(object):
+ """Helper class to test whether a room directory action is allowed, like
+ creating an alias or publishing a room.
+ """
+
+ def __init__(self, option_name, rule):
+ """
+ Args:
+ option_name (str): Name of the config option this rule belongs to
+ rule (dict): The rule as specified in the config
+ """
+
action = rule["action"]
- user_id = rule["user_id"]
- alias = rule["alias"]
+ user_id = rule.get("user_id", "*")
+ room_id = rule.get("room_id", "*")
+ alias = rule.get("alias", "*")
if action in ("allow", "deny"):
self.action = action
else:
raise ConfigError(
- "alias_creation_rules rules can only have action of 'allow'"
- " or 'deny'"
+ "%s rules can only have action of 'allow'"
+ " or 'deny'" % (option_name,)
)
+ self._alias_matches_all = alias == "*"
+
try:
self._user_id_regex = glob_to_regex(user_id)
self._alias_regex = glob_to_regex(alias)
+ self._room_id_regex = glob_to_regex(room_id)
except Exception as e:
raise ConfigError("Failed to parse glob into regex: %s", e)
- def matches(self, user_id, alias):
- """Tests if this rule matches the given user_id and alias.
+ def matches(self, user_id, room_id, aliases):
+ """Tests if this rule matches the given user_id, room_id and aliases.
Args:
user_id (str)
- alias (str)
+ room_id (str)
+ aliases (list[str]): The associated aliases to the room. Will be a
+ single element for testing alias creation, and can be empty for
+ testing room publishing.
Returns:
boolean
@@ -96,7 +199,22 @@ class _AliasRule(object):
if not self._user_id_regex.match(user_id):
return False
- if not self._alias_regex.match(alias):
+ if not self._room_id_regex.match(room_id):
return False
- return True
+ # We only have alias checks left, so we can short circuit if the alias
+ # rule matches everything.
+ if self._alias_matches_all:
+ return True
+
+ # If we are not given any aliases then this rule only matches if the
+ # alias glob matches all aliases, which we checked above.
+ if not aliases:
+ return False
+
+ # Otherwise, we just need one alias to match
+ for alias in aliases:
+ if self._alias_regex.match(alias):
+ return True
+
+ return False
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3da86d4ba6..c2541b62af 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -877,6 +877,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
+ if edu_type == "m.presence":
+ return
+
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 6bb254f899..8b113307d2 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -112,7 +112,9 @@ class DirectoryHandler(BaseHandler):
403, "This user is not permitted to create this alias",
)
- if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
+ if not self.config.is_alias_creation_allowed(
+ user_id, room_id, room_alias.to_string(),
+ ):
# Lets just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
# per alias creation rule?
@@ -395,9 +397,9 @@ class DirectoryHandler(BaseHandler):
room_id (str)
visibility (str): "public" or "private"
"""
- if not self.spam_checker.user_may_publish_room(
- requester.user.to_string(), room_id
- ):
+ user_id = requester.user.to_string()
+
+ if not self.spam_checker.user_may_publish_room(user_id, room_id):
raise AuthError(
403,
"This user is not permitted to publish rooms to the room list"
@@ -415,7 +417,24 @@ class DirectoryHandler(BaseHandler):
yield self.auth.check_can_change_room_list(room_id, requester.user)
- yield self.store.set_room_is_public(room_id, visibility == "public")
+ making_public = visibility == "public"
+ if making_public:
+ room_aliases = yield self.store.get_aliases_for_room(room_id)
+ canonical_alias = yield self.store.get_canonical_alias_for_room(room_id)
+ if canonical_alias:
+ room_aliases.append(canonical_alias)
+
+ if not self.config.is_publishing_room_allowed(
+ user_id, room_id, room_aliases,
+ ):
+ # Lets just return a generic message, as there may be all sorts of
+ # reasons why we said no. TODO: Allow configurable error messages
+ # per alias creation rule?
+ raise SynapseError(
+ 403, "Not allowed to publish room",
+ )
+
+ yield self.store.set_room_is_public(room_id, making_public)
@defer.inlineCallbacks
def edit_published_appservice_room_list(self, appservice_id, network_id,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..d14af54778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -196,7 +196,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 13e212d669..1ec2b734e9 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,9 +44,12 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
- self.response_cache = ResponseCache(hs, "room_list")
- self.remote_response_cache = ResponseCache(hs, "remote_room_list",
- timeout_ms=30 * 1000)
+ self.response_cache = ResponseCache(
+ hs, "room_list", timeout_ms=10 * 60 * 1000,
+ )
+ self.remote_response_cache = ResponseCache(
+ hs, "remote_room_list", timeout_ms=30 * 1000,
+ )
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2beffdf41e..42bf5eda1e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -66,6 +66,7 @@ class RoomMemberHandler(object):
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -304,18 +305,37 @@ class RoomMemberHandler(object):
):
key = (room_id,)
- with (yield self.member_linearizer.queue(key)):
- result = yield self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (yield self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (yield self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ )
defer.returnValue(result)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..24fc3850ff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -39,6 +39,7 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -208,7 +209,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS,
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..0dacd9e357 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -28,7 +28,6 @@ from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-
class UserDirectoryHandler(object):
"""Handles querying of and keeping updated the user_directory.
@@ -130,7 +129,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
- user_id, profile.display_name, profile.avatar_url, None
+ user_id, profile.display_name, profile.avatar_url, None,
)
@defer.inlineCallbacks
@@ -166,9 +165,8 @@ class UserDirectoryHandler(object):
self.pos = deltas[-1]["stream_id"]
# Expose current event processing position to prometheus
- synapse.metrics.event_processing_positions.labels("user_dir").set(
- self.pos
- )
+ synapse.metrics.event_processing_positions.labels(
+ "user_dir").set(self.pos)
yield self.store.update_user_directory_stream_pos(self.pos)
@@ -192,25 +190,21 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
- logger.info(
- "Doing initial update of user directory. %d users", len(user_ids)
- )
+ logger.info("Doing initial update of user directory. %d users", len(user_ids))
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
- logger.info(
- "Handling user %d/%d", num_processed_users + 1, len(user_ids)
- )
+ logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
- yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
+ yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
logger.info("Processed all users")
@@ -229,24 +223,24 @@ class UserDirectoryHandler(object):
if not is_in_room:
return
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- room_id
- )
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
- room_id,
- {user_id: users_with_profile[user_id] for user_id in unhandled_users},
+ room_id, {
+ user_id: users_with_profile[user_id] for user_id in unhandled_users
+ }
)
self.initially_handled_users |= unhandled_users
if is_public:
yield self.store.add_users_to_public_room(
- room_id, user_ids=user_ids - self.initially_handled_users_in_public
+ room_id,
+ user_ids=user_ids - self.initially_handled_users_in_public
)
self.initially_handled_users_in_public |= user_ids
@@ -258,7 +252,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@@ -273,7 +267,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)
@@ -295,23 +289,25 @@ class UserDirectoryHandler(object):
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert
+ room_id, not is_public, to_insert,
)
to_insert.clear()
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
+ room_id, not is_public, to_update,
)
to_update.clear()
if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
to_insert.clear()
if to_update:
yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
+ room_id, not is_public, to_update,
)
to_update.clear()
@@ -332,42 +328,21 @@ class UserDirectoryHandler(object):
# may have become public or not and add/remove the users in said room
if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
yield self._handle_room_publicity_change(
- room_id, prev_event_id, event_id, typ
+ room_id, prev_event_id, event_id, typ,
)
elif typ == EventTypes.Member:
change = yield self._get_key_change(
- prev_event_id,
- event_id,
+ prev_event_id, event_id,
key_name="membership",
public_value=Membership.JOIN,
)
- if change is False:
- # Need to check if the server left the room entirely, if so
- # we might need to remove all the users in that room
- is_in_room = yield self.store.is_host_joined(
- room_id, self.server_name
- )
- if not is_in_room:
- logger.info("Server left room: %r", room_id)
- # Fetch all the users that we marked as being in user
- # directory due to being in the room and then check if
- # need to remove those users or not
- user_ids = yield self.store.get_users_in_dir_due_to_room(
- room_id
- )
- for user_id in user_ids:
- yield self._handle_remove_user(room_id, user_id)
- return
- else:
- logger.debug("Server is still in room: %r", room_id)
-
is_support = yield self.store.is_support_user(state_key)
if not is_support:
if change is None:
# Handle any profile changes
yield self._handle_profile_change(
- state_key, room_id, prev_event_id, event_id
+ state_key, room_id, prev_event_id, event_id,
)
continue
@@ -399,15 +374,13 @@ class UserDirectoryHandler(object):
if typ == EventTypes.RoomHistoryVisibility:
change = yield self._get_key_change(
- prev_event_id,
- event_id,
+ prev_event_id, event_id,
key_name="history_visibility",
public_value="world_readable",
)
elif typ == EventTypes.JoinRules:
change = yield self._get_key_change(
- prev_event_id,
- event_id,
+ prev_event_id, event_id,
key_name="join_rule",
public_value=JoinRules.PUBLIC,
)
@@ -532,7 +505,7 @@ class UserDirectoryHandler(object):
)
if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
- other_user_id, user_id
+ other_user_id, user_id,
)
if shared_is_private is True:
# We've already marked in the database they share a private room
@@ -547,11 +520,13 @@ class UserDirectoryHandler(object):
to_insert.add((other_user_id, user_id))
if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
if to_update:
yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
+ room_id, not is_public, to_update,
)
@defer.inlineCallbacks
@@ -570,15 +545,15 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_public_room(user_id)
update_user_in_public = row and row["room_id"] == room_id
- if update_user_in_public or update_user_dir:
+ if (update_user_in_public or update_user_dir):
# XXX: Make this faster?
rooms = yield self.store.get_rooms_for_user(user_id)
for j_room_id in rooms:
- if not update_user_in_public and not update_user_dir:
+ if (not update_user_in_public and not update_user_dir):
break
is_in_room = yield self.store.is_host_joined(
- j_room_id, self.server_name
+ j_room_id, self.server_name,
)
if not is_in_room:
@@ -606,19 +581,19 @@ class UserDirectoryHandler(object):
# Get a list of user tuples that were in the DB due to this room and
# users (this includes tuples where the other user matches `user_id`)
user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
- user_id, room_id
+ user_id, room_id,
)
for user_id, other_user_id in user_tuples:
# For each user tuple get a list of rooms that they still share,
# trying to find a private room, and update the entry in the DB
- rooms = yield self.store.get_rooms_in_common_for_users(
- user_id, other_user_id
- )
+ rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id)
# If they dont share a room anymore, remove the mapping
if not rooms:
- yield self.store.remove_user_who_share_room(user_id, other_user_id)
+ yield self.store.remove_user_who_share_room(
+ user_id, other_user_id,
+ )
continue
found_public_share = None
@@ -632,13 +607,13 @@ class UserDirectoryHandler(object):
else:
found_public_share = None
yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
+ room_id, not is_public, [(user_id, other_user_id)],
)
break
if found_public_share:
yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
+ room_id, not is_public, [(user_id, other_user_id)],
)
@defer.inlineCallbacks
@@ -666,7 +641,7 @@ class UserDirectoryHandler(object):
if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(
- user_id, new_name, new_avatar, room_id
+ user_id, new_name, new_avatar, room_id,
)
@defer.inlineCallbacks
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 87fa7f006a..82ab18acae 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -332,7 +332,12 @@ class HttpPusher(object):
if not notification_dict:
defer.returnValue([])
try:
- resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
+ url = self.url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://http-priv.matrix.org/_matrix/push/v1/notify",
+ )
+
+ resp = yield self.http_client.post_json_get_json(url, notification_dict)
except Exception:
logger.warn(
"Failed to push event %s to %s",
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 19f214281e..dec63ae68d 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
-from synapse.replication.http import federation, membership, send_event
+from synapse.replication.http import federation, membership, registration, send_event, device
REPLICATION_PREFIX = "/_synapse/replication"
@@ -28,3 +28,5 @@ class ReplicationRestResource(JsonResource):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)
+ registration.register_servlets(hs, self)
+ device.register_servlets(hs, self)
diff --git a/synapse/replication/http/device.py b/synapse/replication/http/device.py
new file mode 100644
index 0000000000..605de028a0
--- /dev/null
+++ b/synapse/replication/http/device.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class CheckDeviceRegisteredServlet(ReplicationEndpoint):
+ """
+ Check a device is registered.
+
+ """
+
+ NAME = "device_check_registered"
+ PATH_ARGS = ("user_id")
+
+ def __init__(self, hs):
+ super(CheckDeviceRegisteredServlet, self).__init__(hs)
+ self.device_handler = hs.get_device_handler()
+
+ @staticmethod
+ def _serialize_payload(user_id, device_id, initial_display_name):
+ """
+ """
+ return {
+ "device_id": device_id,
+ "initial_display_name": initial_display_name,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ device_id = content["device_id"]
+ initial_display_name = content["initial_display_name"]
+
+ try:
+ device_id = yield self.device_handler.check_device_registered(user_id, device_id)
+ except Exception as e:
+ defer.returnValue((400, str(e)))
+
+ defer.returnValue((200, {"device_id": device_id}))
+
+
+def register_servlets(hs, http_server):
+ CheckDeviceRegisteredServlet(hs).register(http_server)
diff --git a/synapse/replication/http/registration.py b/synapse/replication/http/registration.py
new file mode 100644
index 0000000000..0f2f226ae1
--- /dev/null
+++ b/synapse/replication/http/registration.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class RegistrationUserCacheInvalidationServlet(ReplicationEndpoint):
+ """
+ Invalidate the caches that a registration usually invalidates.
+
+ Request format:
+
+ POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+
+ {}
+ """
+
+ NAME = "reg_invalidate_user_caches"
+ PATH_ARGS = ("user_id",)
+
+ def __init__(self, hs):
+ super(RegistrationUserCacheInvalidationServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ def _serialize_payload(user_id, args):
+ """
+ Args:
+ user_id (str)
+ """
+ return {}
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, user_id):
+
+ def invalidate(txn):
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_user_by_id, (user_id,)
+ )
+ txn.call_after(self.store.is_guest.invalidate, (user_id,))
+
+ yield self.store.runInteraction("user_invalidate_caches", invalidate)
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ RegistrationUserCacheInvalidationServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60641f1a49..5b8521c770 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
+ self.client_ip_last_seen.prefill(key, now)
+
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index c1e626be3f..d49973634e 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -32,7 +32,7 @@ from twisted.internet import defer
logger = logging.getLogger(__name__)
-MAX_EVENTS_BEHIND = 10000
+MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", (
@@ -265,8 +265,8 @@ class PresenceStream(Stream):
store = hs.get_datastore()
presence_handler = hs.get_presence_handler()
- self.current_token = store.get_current_presence_token
- self.update_function = presence_handler.get_all_presence_updates
+ self.current_token = lambda: 0
+ self.update_function = lambda _a, _b: []
super(PresenceStream, self).__init__(hs)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 82433a2aa9..2e303264f6 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -466,17 +466,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
- yield self.event_creation_handler.create_and_send_nonmember_event(
- room_creator_requester,
- {
- "type": "m.room.message",
- "content": {"body": message, "msgtype": "m.text"},
- "room_id": new_room_id,
- "sender": new_room_user_id,
- },
- ratelimit=False,
- )
-
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -514,6 +503,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ room_creator_requester,
+ {
+ "type": "m.room.message",
+ "content": {"body": message, "msgtype": "m.text"},
+ "room_id": new_room_id,
+ "sender": new_room_user_id,
+ },
+ ratelimit=False,
+ )
+
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 7f812b8209..91c0d5e981 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -33,6 +33,10 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
+from synapse.replication.http.device import CheckDeviceRegisteredServlet
+from synapse.replication.http.registration import (
+ RegistrationUserCacheInvalidationServlet,
+)
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.threepids import check_3pid_allowed
@@ -190,9 +194,20 @@ class RegisterRestServlet(RestServlet):
self.registration_handler = hs.get_handlers().registration_handler
self.identity_handler = hs.get_handlers().identity_handler
self.room_member_handler = hs.get_room_member_handler()
- self.device_handler = hs.get_device_handler()
self.macaroon_gen = hs.get_macaroon_generator()
+ if self.hs.config.worker_app:
+
+ self._invalidate_caches_client = (
+ RegistrationUserCacheInvalidationServlet.make_client(hs)
+ )
+ self._device_check_registered_client = (
+ CheckDeviceRegisteredServlet.make_client(hs)
+ )
+ else:
+ self.device_handler = hs.get_device_handler()
+
+
@interactive_auth_handler
@defer.inlineCallbacks
def on_POST(self, request):
@@ -266,6 +281,9 @@ class RegisterRestServlet(RestServlet):
# == Shared Secret Registration == (e.g. create new user scripts)
if 'mac' in body:
+ if self.hs.config.worker_app:
+ raise SynapseError(403, "Not available at this endpoint")
+
# FIXME: Should we really be determining if this is shared secret
# auth based purely on the 'mac' key?
result = yield self._do_shared_secret_registration(
@@ -456,6 +474,9 @@ class RegisterRestServlet(RestServlet):
)
yield self.registration_handler.post_consent_actions(registered_user_id)
+ if self.hs.config.worker_app:
+ yield self._invalidate_caches_client(registered_user_id)
+
defer.returnValue((200, return_dict))
def on_OPTIONS(self, _):
@@ -466,6 +487,10 @@ class RegisterRestServlet(RestServlet):
user_id = yield self.registration_handler.appservice_register(
username, as_token
)
+
+ if self.hs.config.worker_app:
+ yield self._invalidate_caches_client(user_id)
+
defer.returnValue((yield self._create_registration_details(user_id, body)))
@defer.inlineCallbacks
@@ -647,6 +672,20 @@ class RegisterRestServlet(RestServlet):
})
defer.returnValue(result)
+ @defer.inlineCallbacks
+ def _check_device_registered(self, user_id, device_id, initial_display_name):
+
+ if self.hs.config.worker_app:
+ r = yield self._device_check_registered_client(
+ user_id, device_id, initial_display_name
+ )
+ defer.returnValue(r["device_id"])
+ else:
+ r = yield self.device_handler.check_device_registered(
+ user_id, device_id, initial_display_name
+ )
+ defer.returnValue(r)
+
def _register_device(self, user_id, params):
"""Register a device for a user.
@@ -663,7 +702,7 @@ class RegisterRestServlet(RestServlet):
# register the user's device
device_id = params.get("device_id")
initial_display_name = params.get("initial_device_display_name")
- return self.device_handler.check_device_registered(
+ return self._check_device_registered(
user_id, device_id, initial_display_name
)
@@ -680,7 +719,7 @@ class RegisterRestServlet(RestServlet):
# we have nowhere to store it.
device_id = synapse.api.auth.GUEST_DEVICE_ID
initial_display_name = params.get("initial_device_display_name")
- yield self.device_handler.check_device_registered(
+ yield self._check_device_registered(
user_id, device_id, initial_display_name
)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 9c21362226..1adfee8c0a 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c9e11c3135..8b4554f6af 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -146,6 +146,7 @@ class RegistrationStore(RegistrationWorkerStore,
def __init__(self, db_conn, hs):
super(RegistrationStore, self).__init__(db_conn, hs)
+ self.hs = hs
self.clock = hs.get_clock()
self.register_background_index_update(
@@ -321,10 +322,12 @@ class RegistrationStore(RegistrationWorkerStore,
(user_id_obj.localpart, create_profile_with_displayname)
)
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_id, (user_id,)
- )
- txn.call_after(self.is_guest.invalidate, (user_id,))
+ # Don't invalidate here, it will be done through replication to the worker.
+ if not self.hs.config.worker_app:
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ txn.call_after(self.is_guest.invalidate, (user_id,))
def get_users_by_id_case_insensitive(self, user_id):
"""Gets users that match user_id case insensitively.
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 592c1bcd33..2874dabbd1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -72,7 +72,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
- @cached(max_entries=100000, iterable=True)
+ @cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -86,7 +86,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN,))
return [to_ascii(r[0]) for r in txn]
- return self.runInteraction("get_users_in_room", f)
+ start_time = self._clock.time_msec()
+ result = yield self.runInteraction("get_users_in_room", f)
+ end_time = self._clock.time_msec()
+ logger.info(
+ "Fetched room membership for %s (%i users) in %i ms",
+ room_id, len(result), end_time - start_time,
+ )
+ defer.returnValue(result)
@cached(max_entries=100000)
def get_room_summary(self, room_id):
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6420b2374..ad01071318 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -730,7 +730,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
- return " & ".join(result + ":*" for result in results)
+ return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index d14a7b2538..6ddc4055d2 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -548,6 +548,31 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
_get_filtered_current_state_ids_txn,
)
+ @defer.inlineCallbacks
+ def get_canonical_alias_for_room(self, room_id):
+ """Get canonical alias for room, if any
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[str|None]: The canonical alias, if any
+ """
+
+ state = yield self.get_filtered_current_state_ids(room_id, StateFilter.from_types(
+ [(EventTypes.CanonicalAlias, "")]
+ ))
+
+ event_id = state.get((EventTypes.CanonicalAlias, ""))
+ if not event_id:
+ return
+
+ event = yield self.get_event(event_id, allow_none=True)
+ if not event:
+ return
+
+ defer.returnValue(event.content.get("canonical_alias"))
+
@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
|