diff --git a/synapse/__init__.py b/synapse/__init__.py
index 4d39996a2e..f5cd8271a6 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
except ImportError:
pass
-__version__ = "1.15.1"
+__version__ = "1.16.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 43b87e9a70..080b9bc445 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -18,8 +18,9 @@ from distutils.util import strtobool
import pkg_resources
+from synapse.api.constants import RoomCreationPreset
from synapse.config._base import Config, ConfigError
-from synapse.types import RoomAlias
+from synapse.types import RoomAlias, UserID
from synapse.util.stringutils import random_string_with_symbols
@@ -152,7 +153,50 @@ class RegistrationConfig(Config):
for room_alias in self.auto_join_rooms:
if not RoomAlias.is_valid(room_alias):
raise ConfigError("Invalid auto_join_rooms entry %s" % (room_alias,))
+
+ # Options for creating auto-join rooms if they do not exist yet.
self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True)
+ self.autocreate_auto_join_rooms_federated = config.get(
+ "autocreate_auto_join_rooms_federated", True
+ )
+ self.autocreate_auto_join_room_preset = (
+ config.get("autocreate_auto_join_room_preset")
+ or RoomCreationPreset.PUBLIC_CHAT
+ )
+ self.auto_join_room_requires_invite = self.autocreate_auto_join_room_preset in {
+ RoomCreationPreset.PRIVATE_CHAT,
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+ }
+
+ # Pull the creater/inviter from the configuration, this gets used to
+ # send invites for invite-only rooms.
+ mxid_localpart = config.get("auto_join_mxid_localpart")
+ self.auto_join_user_id = None
+ if mxid_localpart:
+ # Convert the localpart to a full mxid.
+ self.auto_join_user_id = UserID(
+ mxid_localpart, self.server_name
+ ).to_string()
+
+ if self.autocreate_auto_join_rooms:
+ # Ensure the preset is a known value.
+ if self.autocreate_auto_join_room_preset not in {
+ RoomCreationPreset.PUBLIC_CHAT,
+ RoomCreationPreset.PRIVATE_CHAT,
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+ }:
+ raise ConfigError("Invalid value for autocreate_auto_join_room_preset")
+ # If the preset requires invitations to be sent, ensure there's a
+ # configured user to send them from.
+ if self.auto_join_room_requires_invite:
+ if not mxid_localpart:
+ raise ConfigError(
+ "The configuration option `auto_join_mxid_localpart` is required if "
+ "`autocreate_auto_join_room_preset` is set to private_chat or trusted_private_chat, such that "
+ "Synapse knows who to send invitations from. Please "
+ "configure `auto_join_mxid_localpart`."
+ )
+
self.auto_join_rooms_for_guests = config.get("auto_join_rooms_for_guests", True)
self.enable_set_displayname = config.get("enable_set_displayname", True)
@@ -460,7 +504,11 @@ class RegistrationConfig(Config):
#enable_3pid_changes: false
# Users who register on this homeserver will automatically be joined
- # to these rooms
+ # to these rooms.
+ #
+ # By default, any room aliases included in this list will be created
+ # as a publicly joinable room when the first user registers for the
+ # homeserver. This behaviour can be customised with the settings below.
#
#auto_join_rooms:
# - "#example:example.com"
@@ -468,10 +516,62 @@ class RegistrationConfig(Config):
# Where auto_join_rooms are specified, setting this flag ensures that the
# the rooms exist by creating them when the first user on the
# homeserver registers.
+ #
+ # By default the auto-created rooms are publicly joinable from any federated
+ # server. Use the autocreate_auto_join_rooms_federated and
+ # autocreate_auto_join_room_preset settings below to customise this behaviour.
+ #
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
- #autocreate_auto_join_rooms: true
+ # Defaults to true. Uncomment the following line to disable automatically
+ # creating auto-join rooms.
+ #
+ #autocreate_auto_join_rooms: false
+
+ # Whether the auto_join_rooms that are auto-created are available via
+ # federation. Only has an effect if autocreate_auto_join_rooms is true.
+ #
+ # Note that whether a room is federated cannot be modified after
+ # creation.
+ #
+ # Defaults to true: the room will be joinable from other servers.
+ # Uncomment the following to prevent users from other homeservers from
+ # joining these rooms.
+ #
+ #autocreate_auto_join_rooms_federated: false
+
+ # The room preset to use when auto-creating one of auto_join_rooms. Only has an
+ # effect if autocreate_auto_join_rooms is true.
+ #
+ # This can be one of "public_chat", "private_chat", or "trusted_private_chat".
+ # If a value of "private_chat" or "trusted_private_chat" is used then
+ # auto_join_mxid_localpart must also be configured.
+ #
+ # Defaults to "public_chat", meaning that the room is joinable by anyone, including
+ # federated servers if autocreate_auto_join_rooms_federated is true (the default).
+ # Uncomment the following to require an invitation to join these rooms.
+ #
+ #autocreate_auto_join_room_preset: private_chat
+
+ # The local part of the user id which is used to create auto_join_rooms if
+ # autocreate_auto_join_rooms is true. If this is not provided then the
+ # initial user account that registers will be used to create the rooms.
+ #
+ # The user id is also used to invite new users to any auto-join rooms which
+ # are set to invite-only.
+ #
+ # It *must* be configured if autocreate_auto_join_room_preset is set to
+ # "private_chat" or "trusted_private_chat".
+ #
+ # Note that this must be specified in order for new users to be correctly
+ # invited to any auto-join rooms which have been set to invite-only (either
+ # at the time of creation or subsequently).
+ #
+ # Note that, if the room already exists, this user must be joined and
+ # have the appropriate permissions to invite new members.
+ #
+ #auto_join_mxid_localpart: system
# When auto_join_rooms is specified, setting this flag to false prevents
# guest accounts from being automatically joined to the rooms.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index afe0a8238b..e704cf2f44 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -18,7 +18,7 @@ import logging
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
@@ -70,6 +70,10 @@ received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
+pdu_process_time = Histogram(
+ "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
class FederationServer(FederationBase):
def __init__(self, hs):
@@ -271,21 +275,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- with nested_logging_context(event_id):
- try:
- await self._handle_received_pdu(origin, pdu)
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warning("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s",
- event_id,
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
+ with pdu_process_time.time():
+ with nested_logging_context(event_id):
+ try:
+ await self._handle_received_pdu(origin, pdu)
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warning("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 5b8faea4e7..23fb515683 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -201,7 +201,15 @@ class FederationSender(object):
logger.debug("Sending %s to %r", event, destinations)
- self._send_pdu(event, destinations)
+ if destinations:
+ self._send_pdu(event, destinations)
+
+ now = self.clock.time_msec()
+ ts = await self.store.get_received_ts(event.event_id)
+
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "federation_sender"
+ ).observe(now - ts)
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ac1b64caff..f7d9fd621e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -114,6 +114,12 @@ class ApplicationServicesHandler(object):
for service in services:
self.scheduler.submit_event_for_as(service, event)
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(event.event_id)
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "appservice_sender"
+ ).observe(now - ts)
+
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index f2f16b1e43..79a2df6201 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,8 +17,6 @@ import logging
import string
from typing import Iterable, List, Optional
-from twisted.internet import defer
-
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
AuthError,
@@ -55,8 +53,7 @@ class DirectoryHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
- @defer.inlineCallbacks
- def _create_association(
+ async def _create_association(
self,
room_alias: RoomAlias,
room_id: str,
@@ -76,13 +73,13 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}
if not servers:
raise SynapseError(400, "Failed to get server list")
- yield self.store.create_room_alias_association(
+ await self.store.create_room_alias_association(
room_alias, room_id, servers, creator=creator
)
@@ -93,7 +90,7 @@ class DirectoryHandler(BaseHandler):
room_id: str,
servers: Optional[List[str]] = None,
check_membership: bool = True,
- ):
+ ) -> None:
"""Attempt to create a new alias
Args:
@@ -103,9 +100,6 @@ class DirectoryHandler(BaseHandler):
servers: Iterable of servers that others servers should try and join via
check_membership: Whether to check if the user is in the room
before the alias can be set (if the server's config requires it).
-
- Returns:
- Deferred
"""
user_id = requester.user.to_string()
@@ -148,7 +142,7 @@ class DirectoryHandler(BaseHandler):
# per alias creation rule?
raise SynapseError(403, "Not allowed to create alias")
- can_create = await self.can_modify_alias(room_alias, user_id=user_id)
+ can_create = self.can_modify_alias(room_alias, user_id=user_id)
if not can_create:
raise AuthError(
400,
@@ -158,7 +152,9 @@ class DirectoryHandler(BaseHandler):
await self._create_association(room_alias, room_id, servers, creator=user_id)
- async def delete_association(self, requester: Requester, room_alias: RoomAlias):
+ async def delete_association(
+ self, requester: Requester, room_alias: RoomAlias
+ ) -> str:
"""Remove an alias from the directory
(this is only meant for human users; AS users should call
@@ -169,7 +165,7 @@ class DirectoryHandler(BaseHandler):
room_alias
Returns:
- Deferred[unicode]: room id that the alias used to point to
+ room id that the alias used to point to
Raises:
NotFoundError: if the alias doesn't exist
@@ -191,7 +187,7 @@ class DirectoryHandler(BaseHandler):
if not can_delete:
raise AuthError(403, "You don't have permission to delete the alias.")
- can_delete = await self.can_modify_alias(room_alias, user_id=user_id)
+ can_delete = self.can_modify_alias(room_alias, user_id=user_id)
if not can_delete:
raise SynapseError(
400,
@@ -208,8 +204,7 @@ class DirectoryHandler(BaseHandler):
return room_id
- @defer.inlineCallbacks
- def delete_appservice_association(
+ async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
):
if not service.is_interested_in_alias(room_alias.to_string()):
@@ -218,29 +213,27 @@ class DirectoryHandler(BaseHandler):
"This application service has not reserved this kind of alias",
errcode=Codes.EXCLUSIVE,
)
- yield self._delete_association(room_alias)
+ await self._delete_association(room_alias)
- @defer.inlineCallbacks
- def _delete_association(self, room_alias: RoomAlias):
+ async def _delete_association(self, room_alias: RoomAlias):
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
- room_id = yield self.store.delete_room_alias(room_alias)
+ room_id = await self.store.delete_room_alias(room_alias)
return room_id
- @defer.inlineCallbacks
- def get_association(self, room_alias: RoomAlias):
+ async def get_association(self, room_alias: RoomAlias):
room_id = None
if self.hs.is_mine(room_alias):
- result = yield self.get_association_from_room_alias(room_alias)
+ result = await self.get_association_from_room_alias(room_alias)
if result:
room_id = result.room_id
servers = result.servers
else:
try:
- result = yield self.federation.make_query(
+ result = await self.federation.make_query(
destination=room_alias.domain,
query_type="directory",
args={"room_alias": room_alias.to_string()},
@@ -265,7 +258,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND,
)
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)
@@ -277,13 +270,12 @@ class DirectoryHandler(BaseHandler):
return {"room_id": room_id, "servers": servers}
- @defer.inlineCallbacks
- def on_directory_query(self, args):
+ async def on_directory_query(self, args):
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
- result = yield self.get_association_from_room_alias(room_alias)
+ result = await self.get_association_from_room_alias(room_alias)
if result is not None:
return {"room_id": result.room_id, "servers": result.servers}
@@ -344,16 +336,15 @@ class DirectoryHandler(BaseHandler):
ratelimit=False,
)
- @defer.inlineCallbacks
- def get_association_from_room_alias(self, room_alias: RoomAlias):
- result = yield self.store.get_association_from_room_alias(room_alias)
+ async def get_association_from_room_alias(self, room_alias: RoomAlias):
+ result = await self.store.get_association_from_room_alias(room_alias)
if not result:
# Query AS to see if it exists
as_handler = self.appservice_handler
- result = yield as_handler.query_room_alias_exists(room_alias)
+ result = await as_handler.query_room_alias_exists(room_alias)
return result
- def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None):
+ def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None) -> bool:
# Any application service "interested" in an alias they are regexing on
# can modify the alias.
# Users can only modify the alias if ALL the interested services have
@@ -366,12 +357,12 @@ class DirectoryHandler(BaseHandler):
for service in interested_services:
if user_id == service.sender:
# this user IS the app service so they can do whatever they like
- return defer.succeed(True)
+ return True
elif service.is_exclusive_alias(alias.to_string()):
# another service has an exclusive lock on this alias.
- return defer.succeed(False)
+ return False
# either no interested services, or no service with an exclusive lock
- return defer.succeed(True)
+ return True
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
"""Determine whether a user can delete an alias.
@@ -459,8 +450,7 @@ class DirectoryHandler(BaseHandler):
await self.store.set_room_is_public(room_id, making_public)
- @defer.inlineCallbacks
- def edit_published_appservice_room_list(
+ async def edit_published_appservice_room_list(
self, appservice_id: str, network_id: str, room_id: str, visibility: str
):
"""Add or remove a room from the appservice/network specific public
@@ -475,7 +465,7 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
- yield self.store.set_room_is_public_appservice(
+ await self.store.set_room_is_public_appservice(
room_id, appservice_id, network_id, visibility == "public"
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index bca612fbde..efb5188a4b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -381,6 +381,7 @@ class FederationHandler(BaseHandler):
room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
+ self.clock,
room_id,
room_version,
state_maps,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 200127d291..665ad19b5d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -879,7 +879,9 @@ class EventCreationHandler(object):
"""
room_alias = RoomAlias.from_string(room_alias_str)
try:
- mapping = yield directory_handler.get_association(room_alias)
+ mapping = yield defer.ensureDeferred(
+ directory_handler.get_association(room_alias)
+ )
except SynapseError as e:
# Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
if e.errcode == Codes.NOT_FOUND:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 99c1a78fd0..f223630d43 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -17,7 +17,7 @@
import logging
from synapse import types
-from synapse.api.constants import MAX_USERID_LENGTH, LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
@@ -26,7 +26,8 @@ from synapse.replication.http.register import (
ReplicationPostRegisterActionsServlet,
ReplicationRegisterServlet,
)
-from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.storage.state import StateFilter
+from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler
@@ -302,51 +303,157 @@ class RegistrationHandler(BaseHandler):
return user_id
- async def _auto_join_rooms(self, user_id):
- """Automatically joins users to auto join rooms - creating the room in the first place
- if the user is the first to be created.
+ async def _create_and_join_rooms(self, user_id: str):
+ """
+ Create the auto-join rooms and join or invite the user to them.
+
+ This should only be called when the first "real" user registers.
Args:
- user_id(str): The user to join
+ user_id: The user to join
"""
- # auto-join the user to any rooms we're supposed to dump them into
- fake_requester = create_requester(user_id)
+ # Getting the handlers during init gives a dependency loop.
+ room_creation_handler = self.hs.get_room_creation_handler()
+ room_member_handler = self.hs.get_room_member_handler()
- # try to create the room if we're the first real user on the server. Note
- # that an auto-generated support or bot user is not a real user and will never be
- # the user to create the room
- should_auto_create_rooms = False
- is_real_user = await self.store.is_real_user(user_id)
- if self.hs.config.autocreate_auto_join_rooms and is_real_user:
- count = await self.store.count_real_users()
- should_auto_create_rooms = count == 1
- for r in self.hs.config.auto_join_rooms:
+ # Generate a stub for how the rooms will be configured.
+ stub_config = {
+ "preset": self.hs.config.registration.autocreate_auto_join_room_preset,
+ }
+
+ # If the configuration providers a user ID to create rooms with, use
+ # that instead of the first user registered.
+ requires_join = False
+ if self.hs.config.registration.auto_join_user_id:
+ fake_requester = create_requester(
+ self.hs.config.registration.auto_join_user_id
+ )
+
+ # If the room requires an invite, add the user to the list of invites.
+ if self.hs.config.registration.auto_join_room_requires_invite:
+ stub_config["invite"] = [user_id]
+
+ # If the room is being created by a different user, the first user
+ # registered needs to join it. Note that in the case of an invitation
+ # being necessary this will occur after the invite was sent.
+ requires_join = True
+ else:
+ fake_requester = create_requester(user_id)
+
+ # Choose whether to federate the new room.
+ if not self.hs.config.registration.autocreate_auto_join_rooms_federated:
+ stub_config["creation_content"] = {"m.federate": False}
+
+ for r in self.hs.config.registration.auto_join_rooms:
logger.info("Auto-joining %s to %s", user_id, r)
+
try:
- if should_auto_create_rooms:
- room_alias = RoomAlias.from_string(r)
- if self.hs.hostname != room_alias.domain:
- logger.warning(
- "Cannot create room alias %s, "
- "it does not match server domain",
- r,
- )
- else:
- # create room expects the localpart of the room alias
- room_alias_localpart = room_alias.localpart
-
- # getting the RoomCreationHandler during init gives a dependency
- # loop
- await self.hs.get_room_creation_handler().create_room(
- fake_requester,
- config={
- "preset": "public_chat",
- "room_alias_name": room_alias_localpart,
- },
+ room_alias = RoomAlias.from_string(r)
+
+ if self.hs.hostname != room_alias.domain:
+ logger.warning(
+ "Cannot create room alias %s, "
+ "it does not match server domain",
+ r,
+ )
+ else:
+ # A shallow copy is OK here since the only key that is
+ # modified is room_alias_name.
+ config = stub_config.copy()
+ # create room expects the localpart of the room alias
+ config["room_alias_name"] = room_alias.localpart
+
+ info, _ = await room_creation_handler.create_room(
+ fake_requester, config=config, ratelimit=False,
+ )
+
+ # If the room does not require an invite, but another user
+ # created it, then ensure the first user joins it.
+ if requires_join:
+ await room_member_handler.update_membership(
+ requester=create_requester(user_id),
+ target=UserID.from_string(user_id),
+ room_id=info["room_id"],
+ # Since it was just created, there are no remote hosts.
+ remote_room_hosts=[],
+ action="join",
ratelimit=False,
)
+
+ except ConsentNotGivenError as e:
+ # Technically not necessary to pull out this error though
+ # moving away from bare excepts is a good thing to do.
+ logger.error("Failed to join new user to %r: %r", r, e)
+ except Exception as e:
+ logger.error("Failed to join new user to %r: %r", r, e)
+
+ async def _join_rooms(self, user_id: str):
+ """
+ Join or invite the user to the auto-join rooms.
+
+ Args:
+ user_id: The user to join
+ """
+ room_member_handler = self.hs.get_room_member_handler()
+
+ for r in self.hs.config.registration.auto_join_rooms:
+ logger.info("Auto-joining %s to %s", user_id, r)
+
+ try:
+ room_alias = RoomAlias.from_string(r)
+
+ if RoomAlias.is_valid(r):
+ (
+ room_id,
+ remote_room_hosts,
+ ) = await room_member_handler.lookup_room_alias(room_alias)
+ room_id = room_id.to_string()
else:
- await self._join_user_to_room(fake_requester, r)
+ raise SynapseError(
+ 400, "%s was not legal room ID or room alias" % (r,)
+ )
+
+ # Calculate whether the room requires an invite or can be
+ # joined directly. Note that unless a join rule of public exists,
+ # it is treated as requiring an invite.
+ requires_invite = True
+
+ state = await self.store.get_filtered_current_state_ids(
+ room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
+ )
+
+ event_id = state.get((EventTypes.JoinRules, ""))
+ if event_id:
+ join_rules_event = await self.store.get_event(
+ event_id, allow_none=True
+ )
+ if join_rules_event:
+ join_rule = join_rules_event.content.get("join_rule", None)
+ requires_invite = join_rule and join_rule != JoinRules.PUBLIC
+
+ # Send the invite, if necessary.
+ if requires_invite:
+ await room_member_handler.update_membership(
+ requester=create_requester(
+ self.hs.config.registration.auto_join_user_id
+ ),
+ target=UserID.from_string(user_id),
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="invite",
+ ratelimit=False,
+ )
+
+ # Send the join.
+ await room_member_handler.update_membership(
+ requester=create_requester(user_id),
+ target=UserID.from_string(user_id),
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="join",
+ ratelimit=False,
+ )
+
except ConsentNotGivenError as e:
# Technically not necessary to pull out this error though
# moving away from bare excepts is a good thing to do.
@@ -354,6 +461,29 @@ class RegistrationHandler(BaseHandler):
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
+ async def _auto_join_rooms(self, user_id: str):
+ """Automatically joins users to auto join rooms - creating the room in the first place
+ if the user is the first to be created.
+
+ Args:
+ user_id: The user to join
+ """
+ # auto-join the user to any rooms we're supposed to dump them into
+
+ # try to create the room if we're the first real user on the server. Note
+ # that an auto-generated support or bot user is not a real user and will never be
+ # the user to create the room
+ should_auto_create_rooms = False
+ is_real_user = await self.store.is_real_user(user_id)
+ if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user:
+ count = await self.store.count_real_users()
+ should_auto_create_rooms = count == 1
+
+ if should_auto_create_rooms:
+ await self._create_and_join_rooms(user_id)
+ else:
+ await self._join_rooms(user_id)
+
async def post_consent_actions(self, user_id):
"""A series of registration actions that can only be carried out once consent
has been granted
@@ -471,30 +601,6 @@ class RegistrationHandler(BaseHandler):
self._next_generated_user_id += 1
return str(id)
- async def _join_user_to_room(self, requester, room_identifier):
- room_member_handler = self.hs.get_room_member_handler()
- if RoomID.is_valid(room_identifier):
- room_id = room_identifier
- elif RoomAlias.is_valid(room_identifier):
- room_alias = RoomAlias.from_string(room_identifier)
- room_id, remote_room_hosts = await room_member_handler.lookup_room_alias(
- room_alias
- )
- room_id = room_id.to_string()
- else:
- raise SynapseError(
- 400, "%s was not legal room ID or room alias" % (room_identifier,)
- )
-
- await room_member_handler.update_membership(
- requester=requester,
- target=requester.user,
- room_id=room_id,
- remote_room_hosts=remote_room_hosts,
- action="join",
- ratelimit=False,
- )
-
def check_registration_ratelimit(self, address):
"""A simple helper method to check whether the registration rate limit has been hit
for a given IP address
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0b82aa72a6..4c7524493e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1893,9 +1893,6 @@ class SyncHandler(object):
if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
- unread_notifications["org.matrix.msc2625.unread_count"] = notifs[
- "unread_count"
- ]
sync_result_builder.joined.append(room_sync)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 087a49d65d..6035672698 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -463,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
+event_processing_lag_by_event = Histogram(
+ "synapse_event_processing_lag_by_event",
+ "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
+ ["name"],
+)
+
# Build info of the running server.
build_info = Gauge(
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index ab7f948ed4..4304c60d56 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler):
raise
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
+ self.send_header("Content-Length", str(len(output)))
self.end_headers()
self.wfile.write(output)
@@ -261,4 +262,6 @@ class MetricsResource(Resource):
def render_GET(self, request):
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
- return generate_latest(self.registry)
+ response = generate_latest(self.registry)
+ request.setHeader(b"Content-Length", str(len(response)))
+ return response
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 5b00602a56..43ffe6faf0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -189,11 +189,8 @@ class BulkPushRuleEvaluator(object):
)
if matches:
actions = [x for x in rule["actions"] if x != "dont_notify"]
- if (
- "notify" in actions
- or "org.matrix.msc2625.mark_unread" in actions
- ):
- # Push rules say we should act on this event.
+ if actions and "notify" in actions:
+ # Push rules say we should notify the user of this event
actions_by_user[uid] = actions
break
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 4ea683fee0..5dae4648c0 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -39,10 +39,7 @@ def get_badge_count(store, user_id):
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
- # We're populating this badge using the unread_count (instead of the
- # notify_count) as this badge is the number of missed messages, not the
- # number of missed notifications.
- badge += 1 if notifs.get("unread_count") else 0
+ badge += 1 if notifs["notify_count"] else 0
return badge
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 92d3709ae3..b1cac901eb 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -93,6 +93,10 @@ CONDITIONAL_REQUIREMENTS = {
"oidc": ["authlib>=0.14.0"],
"systemd": ["systemd-python>=231"],
"url_preview": ["lxml>=3.5.0"],
+ # Dependencies which are exclusively required by unit test code. This is
+ # NOT a list of all modules that are necessary to run the unit tests.
+ # Tests assume that all optional dependencies are installed.
+ #
# parameterized_class decorator was introduced in parameterized 0.7.0
"test": ["mock>=2.0", "parameterized>=0.7.0"],
"sentry": ["sentry-sdk>=0.7.2"],
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 8173baef8f..e07c32118d 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -15,7 +15,7 @@
import logging
from typing import List, Optional
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, JoinRules, Membership, RoomCreationPreset
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
RestServlet,
@@ -77,7 +77,7 @@ class ShutdownRoomRestServlet(RestServlet):
info, stream_id = await self._room_creation_handler.create_room(
room_creator_requester,
config={
- "preset": "public_chat",
+ "preset": RoomCreationPreset.PUBLIC_CHAT,
"name": room_name,
"power_level_content_override": {"users_default": -10},
},
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index c2c9a9c3aa..bf0f9bd077 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -81,7 +81,8 @@ class LoginRestServlet(RestServlet):
CAS_TYPE = "m.login.cas"
SSO_TYPE = "m.login.sso"
TOKEN_TYPE = "m.login.token"
- JWT_TYPE = "m.login.jwt"
+ JWT_TYPE = "org.matrix.login.jwt"
+ JWT_TYPE_DEPRECATED = "m.login.jwt"
def __init__(self, hs):
super(LoginRestServlet, self).__init__()
@@ -116,6 +117,7 @@ class LoginRestServlet(RestServlet):
flows = []
if self.jwt_enabled:
flows.append({"type": LoginRestServlet.JWT_TYPE})
+ flows.append({"type": LoginRestServlet.JWT_TYPE_DEPRECATED})
if self.cas_enabled:
# we advertise CAS for backwards compat, though MSC1721 renamed it
@@ -149,6 +151,7 @@ class LoginRestServlet(RestServlet):
try:
if self.jwt_enabled and (
login_submission["type"] == LoginRestServlet.JWT_TYPE
+ or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED
):
result = await self.do_jwt_login(login_submission)
elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE:
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index f563b3dc35..9fd4908136 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014-2020 The Matrix.org Foundation C.I.C.
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -267,7 +267,7 @@ def _check_actions(actions):
raise InvalidRuleException("No actions found")
for a in actions:
- if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]:
+ if a in ["notify", "dont_notify", "coalesce"]:
pass
elif isinstance(a, dict) and "set_tweak" in a:
pass
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 50fd843f66..495d9f04c8 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -32,6 +32,7 @@ from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import StateMap
+from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
@@ -414,6 +415,7 @@ class StateHandler(object):
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
+ self.clock,
event.room_id,
room_version,
state_set_ids,
@@ -516,6 +518,7 @@ class StateResolutionHandler(object):
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
+ self.clock,
room_id,
room_version,
list(state_groups_ids.values()),
@@ -589,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids):
def resolve_events_with_store(
+ clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
@@ -625,7 +629,7 @@ def resolve_events_with_store(
)
else:
return v2.resolve_events_with_store(
- room_id, room_version, state_sets, event_map, state_res_store
+ clock, room_id, room_version, state_sets, event_map, state_res_store
)
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index e25bc5d264..bf6caa0946 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -27,12 +27,20 @@ from synapse.api.errors import AuthError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.types import StateMap
+from synapse.util import Clock
logger = logging.getLogger(__name__)
+# We want to yield to the reactor occasionally during state res when dealing
+# with large data sets, so that we don't exhaust the reactor. This is done by
+# yielding to reactor during loops every N iterations.
+_YIELD_AFTER_ITERATIONS = 100
+
+
@defer.inlineCallbacks
def resolve_events_with_store(
+ clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
@@ -42,13 +50,11 @@ def resolve_events_with_store(
"""Resolves the state using the v2 state resolution algorithm
Args:
+ clock
room_id: the room we are working in
-
room_version: The room version
-
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
-
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
@@ -113,13 +119,14 @@ def resolve_events_with_store(
)
sorted_power_events = yield _reverse_topological_power_sort(
- room_id, power_events, event_map, state_res_store, full_conflicted_set
+ clock, room_id, power_events, event_map, state_res_store, full_conflicted_set
)
logger.debug("sorted %d power events", len(sorted_power_events))
# Now sequentially auth each one
resolved_state = yield _iterative_auth_checks(
+ clock,
room_id,
room_version,
sorted_power_events,
@@ -133,20 +140,22 @@ def resolve_events_with_store(
# OK, so we've now resolved the power events. Now sort the remaining
# events using the mainline of the resolved power level.
+ set_power_events = set(sorted_power_events)
leftover_events = [
- ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events
+ ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events
]
logger.debug("sorting %d remaining events", len(leftover_events))
pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
leftover_events = yield _mainline_sort(
- room_id, leftover_events, pl, event_map, state_res_store
+ clock, room_id, leftover_events, pl, event_map, state_res_store
)
logger.debug("resolving remaining events")
resolved_state = yield _iterative_auth_checks(
+ clock,
room_id,
room_version,
leftover_events,
@@ -316,12 +325,13 @@ def _add_event_and_auth_chain_to_graph(
@defer.inlineCallbacks
def _reverse_topological_power_sort(
- room_id, event_ids, event_map, state_res_store, auth_diff
+ clock, room_id, event_ids, event_map, state_res_store, auth_diff
):
"""Returns a list of the event_ids sorted by reverse topological ordering,
and then by power level and origin_server_ts
Args:
+ clock (Clock)
room_id (str): the room we are working in
event_ids (list[str]): The events to sort
event_map (dict[str,FrozenEvent])
@@ -333,18 +343,28 @@ def _reverse_topological_power_sort(
"""
graph = {}
- for event_id in event_ids:
+ for idx, event_id in enumerate(event_ids, start=1):
yield _add_event_and_auth_chain_to_graph(
graph, room_id, event_id, event_map, state_res_store, auth_diff
)
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
event_to_pl = {}
- for event_id in graph:
+ for idx, event_id in enumerate(graph, start=1):
pl = yield _get_power_level_for_sender(
room_id, event_id, event_map, state_res_store
)
event_to_pl[event_id] = pl
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
def _get_power_order(event_id):
ev = event_map[event_id]
pl = event_to_pl[event_id]
@@ -360,12 +380,13 @@ def _reverse_topological_power_sort(
@defer.inlineCallbacks
def _iterative_auth_checks(
- room_id, room_version, event_ids, base_state, event_map, state_res_store
+ clock, room_id, room_version, event_ids, base_state, event_map, state_res_store
):
"""Sequentially apply auth checks to each event in given list, updating the
state as it goes along.
Args:
+ clock (Clock)
room_id (str)
room_version (str)
event_ids (list[str]): Ordered list of events to apply auth checks to
@@ -379,7 +400,7 @@ def _iterative_auth_checks(
resolved_state = base_state.copy()
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- for event_id in event_ids:
+ for idx, event_id in enumerate(event_ids, start=1):
event = event_map[event_id]
auth_events = {}
@@ -417,17 +438,23 @@ def _iterative_auth_checks(
except AuthError:
pass
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
return resolved_state
@defer.inlineCallbacks
def _mainline_sort(
- room_id, event_ids, resolved_power_event_id, event_map, state_res_store
+ clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store
):
"""Returns a sorted list of event_ids sorted by mainline ordering based on
the given event resolved_power_event_id
Args:
+ clock (Clock)
room_id (str): room we're working in
event_ids (list[str]): Events to sort
resolved_power_event_id (str): The final resolved power level event ID
@@ -437,8 +464,14 @@ def _mainline_sort(
Returns:
Deferred[list[str]]: The sorted list
"""
+ if not event_ids:
+ # It's possible for there to be no event IDs here to sort, so we can
+ # skip calculating the mainline in that case.
+ return []
+
mainline = []
pl = resolved_power_event_id
+ idx = 0
while pl:
mainline.append(pl)
pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
@@ -452,17 +485,29 @@ def _mainline_sort(
pl = aid
break
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
+ idx += 1
+
mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
event_ids = list(event_ids)
order_map = {}
- for ev_id in event_ids:
+ for idx, ev_id in enumerate(event_ids, start=1):
depth = yield _get_mainline_depth_for_event(
event_map[ev_id], mainline_map, event_map, state_res_store
)
order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
event_ids.sort(key=lambda ev_id: order_map[ev_id])
return event_ids
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 815d52ab4c..bc9f4f08ea 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2015-2020 The Matrix.org Foundation C.I.C.
+# Copyright 2015 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,9 +15,7 @@
# limitations under the License.
import logging
-from typing import Dict, Tuple
-import attr
from canonicaljson import json
from twisted.internet import defer
@@ -37,16 +36,6 @@ DEFAULT_HIGHLIGHT_ACTION = [
]
-@attr.s
-class EventPushSummary:
- """Summary of pending event push actions for a given user in a given room."""
-
- unread_count = attr.ib(type=int)
- stream_ordering = attr.ib(type=int)
- old_user_id = attr.ib(type=str)
- notif_count = attr.ib(type=int)
-
-
def _serialize_action(actions, is_highlight):
"""Custom serializer for actions. This allows us to "compress" common actions.
@@ -123,7 +112,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
txn.execute(sql, (room_id, last_read_event_id))
results = txn.fetchall()
if len(results) == 0:
- return {"notify_count": 0, "highlight_count": 0, "unread_count": 0}
+ return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
@@ -133,42 +122,25 @@ class EventPushActionsWorkerStore(SQLBaseStore):
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
- # First get number of actions, grouped on whether the action notifies.
+ # First get number of notifications.
+ # We don't need to put a notif=1 clause as all rows always have
+ # notif=1
sql = (
- "SELECT count(*), notif"
+ "SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
- " GROUP BY notif"
)
- txn.execute(sql, (user_id, room_id, stream_ordering))
- rows = txn.fetchall()
- # We should get a maximum number of two rows: one for notif = 0, which is the
- # number of actions that contribute to the unread_count but not to the
- # notify_count, and one for notif = 1, which is the number of actions that
- # contribute to both counters. If one or both rows don't appear, then the
- # value for the matching counter should be 0.
- unread_count = 0
- notify_count = 0
- for row in rows:
- # We always increment unread_count because actions that notify also
- # contribute to it.
- unread_count += row[0]
- if row[1] == 1:
- notify_count = row[0]
- elif row[1] != 0:
- logger.warning(
- "Unexpected value %d for column 'notif' in table"
- " 'event_push_actions'",
- row[1],
- )
+ txn.execute(sql, (user_id, room_id, stream_ordering))
+ row = txn.fetchone()
+ notify_count = row[0] if row else 0
txn.execute(
"""
- SELECT notif_count, unread_count FROM event_push_summary
+ SELECT notif_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""",
(room_id, user_id, stream_ordering),
@@ -176,7 +148,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
rows = txn.fetchall()
if rows:
notify_count += rows[0][0]
- unread_count += rows[0][1]
# Now get the number of highlights
sql = (
@@ -193,11 +164,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
row = txn.fetchone()
highlight_count = row[0] if row else 0
- return {
- "unread_count": unread_count,
- "notify_count": notify_count,
- "highlight_count": highlight_count,
- }
+ return {"notify_count": notify_count, "highlight_count": highlight_count}
@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
@@ -255,7 +222,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -284,7 +250,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -357,7 +322,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -386,7 +350,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -436,7 +399,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
def _get_if_maybe_push_in_range_for_user_txn(txn):
sql = """
SELECT 1 FROM event_push_actions
- WHERE user_id = ? AND stream_ordering > ? AND notif = 1
+ WHERE user_id = ? AND stream_ordering > ?
LIMIT 1
"""
@@ -465,15 +428,14 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return
# This is a helper function for generating the necessary tuple that
- # can be used to insert into the `event_push_actions_staging` table.
+ # can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
- notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
- notif, # notif column
+ 1, # notif column
is_highlight, # highlight column
)
@@ -855,51 +817,24 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
- coalesce(old.%s, 0) + upd.cnt,
+ coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
- SELECT user_id, room_id, count(*) as cnt,
+ SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
- %s
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""
- # First get the count of unread messages.
- txn.execute(
- sql % ("unread_count", ""),
- (old_rotate_stream_ordering, rotate_to_stream_ordering),
- )
-
- # We need to merge both lists into a single object because we might not have the
- # same amount of rows in each of them. In this case we use a dict indexed on the
- # user ID and room ID to make it easier to populate.
- summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
- for row in txn:
- summaries[(row[0], row[1])] = EventPushSummary(
- unread_count=row[2],
- stream_ordering=row[3],
- old_user_id=row[4],
- notif_count=0,
- )
-
- # Then get the count of notifications.
- txn.execute(
- sql % ("notif_count", "AND notif = 1"),
- (old_rotate_stream_ordering, rotate_to_stream_ordering),
- )
-
- # notif_rows is populated based on a subset of the query used to populate
- # unread_rows, so we can be sure that there will be no KeyError here.
- for row in txn:
- summaries[(row[0], row[1])].notif_count = row[2]
+ txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
+ rows = txn.fetchall()
- logger.info("Rotating notifications, handling %d rows", len(summaries))
+ logger.info("Rotating notifications, handling %d rows", len(rows))
# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
@@ -909,34 +844,22 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
table="event_push_summary",
values=[
{
- "user_id": user_id,
- "room_id": room_id,
- "notif_count": summary.notif_count,
- "unread_count": summary.unread_count,
- "stream_ordering": summary.stream_ordering,
+ "user_id": row[0],
+ "room_id": row[1],
+ "notif_count": row[2],
+ "stream_ordering": row[3],
}
- for ((user_id, room_id), summary) in summaries.items()
- if summary.old_user_id is None
+ for row in rows
+ if row[4] is None
],
)
txn.executemany(
"""
- UPDATE event_push_summary
- SET notif_count = ?, unread_count = ?, stream_ordering = ?
+ UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
- (
- (
- summary.notif_count,
- summary.unread_count,
- summary.stream_ordering,
- user_id,
- room_id,
- )
- for ((user_id, room_id), summary) in summaries.items()
- if summary.old_user_id is not None
- ),
+ ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
)
txn.execute(
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 8aecd414c2..15bc13cbd0 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -81,6 +81,15 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
+ def mark_local_media_as_safe(self, media_id: str):
+ """Mark a local media as safe from quarantining."""
+ return self.db.simple_update_one(
+ table="local_media_repository",
+ keyvalues={"media_id": media_id},
+ updatevalues={"safe_from_quarantine": True},
+ desc="mark_local_media_as_safe",
+ )
+
def get_url_cache(self, url, ts):
"""Get the media_id and ts for a cached URL as of the given timestamp
Returns:
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 55c1defc73..d72d6affb8 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -649,36 +649,10 @@ class RoomWorkerStore(SQLBaseStore):
def _quarantine_media_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
- total_media_quarantined = 0
-
- # Now update all the tables to set the quarantined_by flag
-
- txn.executemany(
- """
- UPDATE local_media_repository
- SET quarantined_by = ?
- WHERE media_id = ?
- """,
- ((quarantined_by, media_id) for media_id in local_mxcs),
- )
-
- txn.executemany(
- """
- UPDATE remote_media_cache
- SET quarantined_by = ?
- WHERE media_origin = ? AND media_id = ?
- """,
- (
- (quarantined_by, origin, media_id)
- for origin, media_id in remote_mxcs
- ),
+ return self._quarantine_media_txn(
+ txn, local_mxcs, remote_mxcs, quarantined_by
)
- total_media_quarantined += len(local_mxcs)
- total_media_quarantined += len(remote_mxcs)
-
- return total_media_quarantined
-
return self.db.runInteraction(
"quarantine_media_in_room", _quarantine_media_in_room_txn
)
@@ -828,17 +802,17 @@ class RoomWorkerStore(SQLBaseStore):
Returns:
The total number of media items quarantined
"""
- total_media_quarantined = 0
-
# Update all the tables to set the quarantined_by flag
txn.executemany(
"""
UPDATE local_media_repository
SET quarantined_by = ?
- WHERE media_id = ?
+ WHERE media_id = ? AND safe_from_quarantine = ?
""",
- ((quarantined_by, media_id) for media_id in local_mxcs),
+ ((quarantined_by, media_id, False) for media_id in local_mxcs),
)
+ # Note that a rowcount of -1 can be used to indicate no rows were affected.
+ total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0
txn.executemany(
"""
@@ -848,9 +822,7 @@ class RoomWorkerStore(SQLBaseStore):
""",
((quarantined_by, origin, media_id) for origin, media_id in remote_mxcs),
)
-
- total_media_quarantined += len(local_mxcs)
- total_media_quarantined += len(remote_mxcs)
+ total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
diff --git a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
index f1459ef7f0..597f2ffd3d 100644
--- a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
@@ -13,11 +13,6 @@
* limitations under the License.
*/
--- Store the number of unread messages, i.e. messages that triggered either a notify
--- action or a mark_unread one.
-ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0;
-
--- Pre-populate the new column with the count of pending notifications.
--- We expect event_push_summary to be relatively small, so we can do this update
--- synchronously without impacting Synapse's startup time too much.
-UPDATE event_push_summary SET unread_count = notif_count;
\ No newline at end of file
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT FALSE;
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
new file mode 100644
index 0000000000..69db89ac0e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT 0;
|